| Server IP : 213.136.93.164 / Your IP : 216.73.216.104 Web Server : Apache System : Linux m14200.contabo.net 5.14.0-611.54.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Wed May 6 18:03:03 EDT 2026 x86_64 User : ki692510 ( 1047) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : OFF | Pkexec : OFF Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/internals/ |
Upload File : |
"""
Shared reader for the local feature flags file.
The file is written by:
- Go resident-agent FeatureFlags plugin (IM360 mode)
- Python FeatureFlagsSync plugin (AV mode)
Other subsystems (e.g. message_status_publisher) use this module
to check individual flag values at runtime.
Supported JSON shapes on disk (readers / ``is_enabled`` / ``get_params``):
- New shape ``{"flags": ["mqtt_tracking"], "params": {"flag": ["A", "B"]}}``
(mirrors the sync API response; carries per-flag string-list params).
- Legacy object ``{"mqtt_tracking": true, ...}`` (still accepted).
- JSON array of enabled names ``["mqtt_tracking"]`` (still accepted).
- Legacy wrapper ``{"flags": ["mqtt_tracking", ...]}`` (still accepted).
The sync API checksum collapses to the legacy sorted-names array when no
params are present, so this agent and older agents agree on the bool-only
case. With params, the canonical form expands to ``{"flags": [...], "params":
{...}}`` with all keys and list members sorted.
The sync plugin also writes ``FLAGS_PLAIN_PATH`` (``/var/imunify360/feature_flags``):
plain text, one enabled flag name per line (sorted), for scripts.
"""
from __future__ import annotations
import hashlib
import json
import os
from typing import Any
FLAGS_PATH = "/var/imunify360/feature_flags.json"
# Plain list of enabled flag names (one per line), same order as sorted JSON array.
FLAGS_PLAIN_PATH = "/var/imunify360/feature_flags"
# Flag name whose params list drives MQTT message-status enrichment.
MQTT_TRACKED_METHODS_FLAG = "mqtt_tracked_methods"
_cached_flags: dict[str, Any] = {}
_cached_params: dict[str, list[str]] = {}
# Pre-built frozenset for the MQTT tracked-methods allow-list. Cached
# alongside the raw params dict so the hot path (every Reportable message
# in the_sink._call_unlocked) avoids re-allocating a fresh frozenset and
# the list copy that get_params() would do. Invalidated by the same
# file-mtime trigger that invalidates _cached_params.
_cached_mqtt_methods: frozenset[str] = frozenset()
_cached_mtime: float = 0.0
def _normalize_flags_from_file(raw: Any) -> dict[str, Any]:
"""Map file JSON to a flat name->value dict for :func:`is_enabled`."""
if raw is None:
return {}
if isinstance(raw, list):
out: dict[str, Any] = {}
for item in raw:
if isinstance(item, str):
out[item] = True
return out
if isinstance(raw, dict):
inner = raw.get("flags")
if isinstance(inner, list):
return _normalize_flags_from_file(inner)
return raw
return {}
def _params_from_file(raw: Any) -> dict[str, list[str]]:
"""Extract ``params`` mapping from new-shape file content.
Only the new ``{"flags": [...], "params": {name: [...]}}`` shape carries
params; every other (legacy) shape returns an empty mapping.
"""
if not isinstance(raw, dict):
return {}
raw_params = raw.get("params")
if not isinstance(raw_params, dict):
return {}
out: dict[str, list[str]] = {}
for name, values in raw_params.items():
if not isinstance(name, str) or not isinstance(values, list):
continue
cleaned = [v for v in values if isinstance(v, str)]
if cleaned:
out[name] = cleaned
return out
def _read_state() -> tuple[dict[str, Any], dict[str, list[str]]]:
global _cached_flags, _cached_params, _cached_mqtt_methods, _cached_mtime
try:
mtime = os.path.getmtime(FLAGS_PATH)
except OSError:
_cached_flags = {}
_cached_params = {}
_cached_mqtt_methods = frozenset()
_cached_mtime = 0.0
return _cached_flags, _cached_params
if mtime == _cached_mtime:
return _cached_flags, _cached_params
try:
with open(FLAGS_PATH) as f:
raw = json.load(f)
_cached_flags = _normalize_flags_from_file(raw)
_cached_params = _params_from_file(raw)
except (OSError, json.JSONDecodeError):
_cached_flags = {}
_cached_params = {}
_cached_mqtt_methods = frozenset(
_cached_params.get(MQTT_TRACKED_METHODS_FLAG, ())
)
_cached_mtime = mtime
return _cached_flags, _cached_params
def _read_flags() -> dict[str, Any]:
flags, _ = _read_state()
return flags
def _read_params() -> dict[str, list[str]]:
_, params = _read_state()
return params
def enabled_flag_names_sorted(flags: Any) -> list[str]:
"""Return sorted enabled flag names for JSON and plain-text sidecar.
Accepts the same shapes as :func:`_normalize_flags_from_file` (array,
flat map, ``{"flags": [...]}``) so checksums and sidecars match Go
``enabledNamesSortedForChecksum`` / :func:`is_enabled`.
"""
if not isinstance(flags, (list, dict)):
raise TypeError(
f"flags must be list or dict, not {type(flags).__name__}"
)
normalized = _normalize_flags_from_file(flags)
return sorted(k for k, v in normalized.items() if v)
def canonical_sync_flag_list_bytes(names: list[str]) -> bytes:
"""JSON array bytes used for sync MD5 when no params are present
(matches correlation_api ``checksum_for_sync_flag_list``)."""
ordered = sorted(names)
return json.dumps(ordered, sort_keys=True, indent=2).encode()
def canonical_sync_response_bytes(
names: list[str], params: dict[str, list[str]]
) -> bytes:
"""JSON bytes for the sync MD5 over the full response shape.
Mirrors correlation_api ``checksum_for_sync_response``: collapses to
the legacy sorted-names array when ``params`` is empty so old agents
keep matching, otherwise expands to the deterministic
``{"flags": [...], "params": {...}}`` form with all keys and list
members sorted.
"""
if not params:
return canonical_sync_flag_list_bytes(names)
canonical = {
"flags": sorted(names),
"params": {k: sorted(v) for k, v in sorted(params.items())},
}
return json.dumps(canonical, sort_keys=True, indent=2).encode()
def sync_checksum_hex_from_flags_file(path: str) -> str:
"""MD5 hex of the canonical sync-response form for ``path``.
Returns "" if the file is missing or invalid. Computes the same MD5
the server returned, so a matching checksum lets the agent skip
the response payload on the next sync.
"""
try:
with open(path, encoding="utf-8") as f:
raw = json.load(f)
except (OSError, UnicodeDecodeError, json.JSONDecodeError):
return ""
names = enabled_flag_names_sorted(raw)
params = _params_from_file(raw)
payload = canonical_sync_response_bytes(names, params)
return hashlib.md5(payload, usedforsecurity=False).hexdigest()
def legacy_feature_flags_map_bytes(names: list[str]) -> bytes:
"""On-disk legacy JSON: ``{flag: true, ...}`` with sorted keys."""
d = {n: True for n in sorted({x for x in names if isinstance(x, str)})}
return json.dumps(d, sort_keys=True, indent=2).encode()
def sync_response_file_bytes(
names: list[str], params: dict[str, list[str]]
) -> bytes:
"""Persisted form for ``FLAGS_PATH`` carrying both flags and params.
Same canonical shape as ``canonical_sync_response_bytes`` so the file
is self-describing and round-trips through ``sync_checksum_hex_from_flags_file``.
"""
canonical = {
"flags": sorted(names),
"params": {k: sorted(v) for k, v in sorted(params.items())},
}
return json.dumps(canonical, sort_keys=True, indent=2).encode()
def plain_text_payload_for_enabled_flags(flags: Any) -> bytes:
"""Body for ``FLAGS_PLAIN_PATH``: one name per line, trailing newline if non-empty."""
names = enabled_flag_names_sorted(flags)
if not names:
return b""
return ("\n".join(names) + "\n").encode()
def serialize_feature_flags_file_payload(flags: Any) -> bytes:
"""Serialize dict flags for writing ``FLAGS_PATH`` (legacy map only)."""
if isinstance(flags, dict):
return json.dumps(flags, sort_keys=True, indent=2).encode()
raise TypeError(f"flags must be dict, not {type(flags).__name__}")
def is_enabled(flag_name: str, default: bool = False) -> bool:
"""Return whether *flag_name* is enabled.
If the file is missing, unreadable, or the flag is absent,
*default* is returned. Defaults to False so unknown flags are
treated as disabled unless the caller explicitly opts in.
"""
flags = _read_flags()
value = flags.get(flag_name)
if value is None:
return default
return bool(value)
def get_params(flag_name: str) -> list[str]:
"""Return the per-flag string params from the on-disk file.
Empty list when the file is missing/unreadable, the flag is unknown,
or the value did not come from the new structured shape (legacy
bool-only flags carry no params by definition).
"""
return list(_read_params().get(flag_name, ()))
def mqtt_tracked_methods() -> frozenset[str]:
"""Frozen set of method names whose status events should be enriched
for MQTT tracing. Driven entirely by the server-side
``mqtt_tracked_methods`` flag's params list — the agent has no
hard-coded list, so adding/removing tracked types is a server-side
config change with no agent rollout.
Cached: ``_read_state`` pre-builds the frozenset and invalidates it
when the flags file's mtime changes. On the hot path — every
Reportable message in ``the_sink._call_unlocked`` — this is a single
``os.stat`` syscall plus an identity-stable frozenset return. Two
consecutive calls within the same mtime window return the same
instance.
"""
_read_state()
return _cached_mqtt_methods