| Server IP : 213.136.93.164 / Your IP : 216.73.216.36 Web Server : Apache System : Linux m14200.contabo.net 5.14.0-611.54.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Wed May 6 18:03:03 EDT 2026 x86_64 User : ki692510 ( 1047) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : OFF | Pkexec : OFF Directory : /opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/public_hooks/lib/ |
Upload File : |
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
import argparse
import inspect
import logging
import os
import re
import sys
import raven
import time
from io import StringIO
from contextlib import contextmanager
from functools import wraps
logger = logging.getLogger(__name__)
def _sanitize_log_value(value):
"""Strip control characters from a value before logging."""
return str(value).replace('\n', '\\n').replace('\r', '\\r')
LISTENERS_DIRECTORY = '/usr/share/cloudlinux/hooks/listeners/'
# Hosting account / panel-owner name validator.
#
# Deny-list of dangerous characters, expressed as a negated character class.
# The previous allow-list `^[a-z_][a-z0-9._-]{0,31}$` (MR !30) over-narrowed:
# it rejected non-ASCII reseller/owner names that Plesk legitimately produces
# (e.g. `реселлер4` in lve-utils' Plesk i18n fixtures), which broke
# lve-utils.python_tests.test_cpapi_plesk on Plesk and stalled the nightly
# (CLOS-4470). Same threat model is still covered: control chars, whitespace,
# path separators, and shell metacharacters are rejected.
#
# Forbidden:
# * `\x00-\x1f` and `\x7f` C0/C1 control chars and DEL
# (log-line breaks, terminal escapes)
# * `\s` whitespace
# (CLI-arg ambiguity, log-line breaks)
# * `/` `\` path separators
# (path traversal inside listener scripts)
# * `; & | \` $ ( ) < > ' " shell metacharacters
# `! * ? [ ] { } ~` (command injection if a listener spawns a
# shell; glob/brace/history expansion)
#
# Length 1..255 (matches typical filesystem NAME_MAX; generous for any panel).
# All other Unicode codepoints are allowed.
_VALID_NAME_RE = re.compile(
r"\A[^\s/\\;&|`$()<>'\"!*?\[\]{}~\x00-\x1f\x7f]{1,255}\Z"
)
def valid_name(value):
"""argparse type-callback for hosting account / panel-owner names.
Used by post_modify_user.py, pre_modify_user.py, post_modify_admin.py,
and post_modify_domain.py to reject values that contain shell
metacharacters, path separators, control chars, or whitespace before
dispatching to listener plugins.
Accepts any Unicode string of length 1..255 that does not contain a
forbidden character — see `_VALID_NAME_RE` above for the full list.
"""
if not isinstance(value, str) or not _VALID_NAME_RE.fullmatch(value):
raise argparse.ArgumentTypeError(
f'invalid name {value!r}: contains forbidden character or has '
f'length outside 1..255')
return value
@contextmanager
def capture_output(stdo, stde):
stdout = sys.stdout
stderr = sys.stderr
try:
sys.stdout = stdo or StringIO()
sys.stderr = stde or StringIO()
yield
finally:
sys.stdout = stdout
sys.stderr = stderr
def hook_method(func):
"""
Magic decorator that calls all subclass methods
that override base decorated one.
Requirements:
- subclass must be defined in .py file in LISTENERS_DIRECTORY
- subclass must NOT start with '_' char
- subclass must override base event method (the one with '@hook_method')
"""
@wraps(func)
def _wrapped(self, *args, **kwargs):
# this only return direct subclasses, so we can't make `proxies` now
for subclass in self.__class__.__subclasses__():
listener_path = os.path.dirname(inspect.getmodule(subclass).__file__)
# skip child if it is not in expected directory
if os.path.normpath(LISTENERS_DIRECTORY) != os.path.normpath(listener_path):
logger.warning('%s is not in %s directory; it is in %s,'
' skip', subclass, LISTENERS_DIRECTORY, listener_path)
continue
# skip internal classes
if subclass.__name__.startswith('_'):
continue
# magic: get method only if it is defined in child (NOT in parent)
listener = getattr(subclass(), func.__name__)
if getattr(listener, 'is_magic_method', False):
logger.debug('skip %s is not implemented in %s',
func.__name__, subclass.__name__)
continue
logger.info('executing %s:%s', func.__name__, subclass.__name__)
now = time.time()
stdout, stderr = StringIO(), StringIO()
try:
with capture_output(stdout, stderr):
listener(*args, **kwargs)
except Exception:
# use Raven carefully and only in places where
# you sure that sentry is already initialized
raven.base.Raven.captureException(
fingerprint=['{{ default }}', subclass.__name__, func.__name__],
extra={'stdout': stdout.getvalue(), 'stderr': stderr.getvalue()}
)
logger.warning('listener %s:%s crashed', subclass.__name__, func.__name__, exc_info=1)
finally:
elapsed = time.time() - now
stdout_str = stdout.getvalue()
if stdout_str:
logger.info('captured stdout of %s:%s\n~BEGIN OUTPUT~\n%s\n~END OUTPUT~\n',
func.__name__, subclass.__name__, stdout_str)
stderr_str = stderr.getvalue()
if stderr_str:
logger.debug('captured stderr of %s:%s\n~BEGIN OUTPUT~\n%s\n~END OUTPUT~\n',
func.__name__, subclass.__name__, stderr_str)
logger.debug('running %s: %.4f elapsed', func.__name__, elapsed)
logger.info('%s executed by the user with uid %s and gid %s',
func.__name__, os.geteuid(), os.getegid())
logger.info('ended %s(%s, %s)', func.__name__,
_sanitize_log_value(args), _sanitize_log_value(kwargs))
# special marker to determine overrided methods
_wrapped.is_magic_method = True
return _wrapped