403Webshell
Server IP : 213.136.93.164  /  Your IP : 216.73.216.104
Web Server : Apache
System : Linux m14200.contabo.net 5.14.0-611.54.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Wed May 6 18:03:03 EDT 2026 x86_64
User : ki692510 ( 1047)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/model/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/model//tls_check.py
import logging
import threading
import time
import traceback

from playhouse.sqlite_ext import SqliteExtDatabase

from defence360agent.internals.global_scope import g


class OverridingReset(Exception):
    """
    Overriding reset could be a signal of logic error
    thus need to be explicitly handled in all places where
    this exception is expected to occur.
    """

    pass


logger = logging.getLogger(__name__)
_thread_local_storage = threading.local()

_SLOW_TXN_THRESHOLD_S = 5.0


class _TimedAtomic:
    def __init__(self, inner: object):
        self._inner = inner
        self._start: float = 0.0
        self._caller: str = ""

    def __enter__(self):
        self._start = time.monotonic()
        self._caller = "".join(traceback.format_stack(limit=4)[:-1])
        return self._inner.__enter__()

    def __exit__(self, *args):
        result = self._inner.__exit__(*args)
        elapsed = time.monotonic() - self._start
        if elapsed > _SLOW_TXN_THRESHOLD_S:
            logger.warning(
                "Slow transaction held for %.2fs\n%s",
                elapsed,
                self._caller,
            )
        return result


class SqliteDatabaseWrapper(SqliteExtDatabase):
    def execute_sql(self, *args, **kwargs):
        _validate(*args, **kwargs)
        return super().execute_sql(*args, **kwargs)

    def atomic(self, lock_type: str = "IMMEDIATE"):
        inner = super().atomic(lock_type)
        if g.get("DEBUG"):
            return _TimedAtomic(inner)
        return inner


def reset(new_value=None):
    if hasattr(_thread_local_storage, "thread_ident_memo"):
        raise OverridingReset()

    _thread_local_storage.thread_ident_memo = (
        new_value or threading.get_ident()
    )


def _validate(*args, **kwargs):
    thread_ident_memo = getattr(
        _thread_local_storage, "thread_ident_memo", None
    )

    if thread_ident_memo is None:
        logger.error("wrong thread or _validate() was not preceded by reset()")

    elif thread_ident_memo != threading.get_ident():
        logger.error(
            "thread_ident_memo check failed [%r != %r]\n"
            "context:\nargs: %s\nkwargs: %s",
            thread_ident_memo,
            threading.get_ident(),
            args,
            kwargs,
        )

Youez - 2016 - github.com/yon3zu
LinuXploit