| Server IP : 213.136.93.164 / Your IP : 216.73.216.20 Web Server : Apache System : Linux m14200.contabo.net 5.14.0-611.54.1.el9_7.x86_64 #1 SMP PREEMPT_DYNAMIC Wed May 6 18:03:03 EDT 2026 x86_64 User : ki692510 ( 1047) PHP Version : 7.4.33 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : OFF | Pkexec : OFF Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/ |
Upload File : |
class LineBufferOverflow(Exception):
pass
class LineBuffer(object):
"""
Allows to accumulate data, and than iterate over it getting tokens
split by line breaks '\n'. If at the end there is no line break,
the data will sit in the line buffer until more data with line
break comes in.
"""
MAX_SIZE = 16 * 1024 * 1024
def __init__(self):
self.buf = ""
def append(self, data):
if len(self.buf) + len(data) > self.MAX_SIZE:
self.buf = ""
raise LineBufferOverflow(
"LineBuffer exceeded maximum size of {} bytes".format(
self.MAX_SIZE
)
)
self.buf += data
def __iter__(self):
return self
def __next__(self):
pos = self.buf.find("\n")
if pos != -1:
result = self.buf[0:pos]
self.buf = self.buf[pos + 1 :]
return result
raise StopIteration
def clean(self):
self.buf = ""
class SizeBufferOverflow(Exception):
pass
class SizeBuffer:
MAX_SIZE = 16 * 1024 * 1024
def __init__(self, size_len=2):
self._buf = b""
self._size_len = size_len
def append(self, data):
if len(self._buf) + len(data) > self.MAX_SIZE:
self._buf = b""
raise SizeBufferOverflow(
"SizeBuffer exceeded maximum size of {} bytes".format(
self.MAX_SIZE
)
)
self._buf += data
def __iter__(self):
return self
def __next__(self):
if not self._buf:
raise StopIteration
size = int.from_bytes(self._buf[: self._size_len], "big")
if len(self._buf[self._size_len :]) >= size:
data = self._buf[self._size_len : self._size_len + size]
self._buf = self._buf[self._size_len + size :]
return data
raise StopIteration