Skip to content

Commit

Permalink
Added log output trim
Browse files Browse the repository at this point in the history
  • Loading branch information
matllubos committed Nov 10, 2022
1 parent 69e9974 commit 8ae9f93
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 14 deletions.
9 changes: 9 additions & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,12 @@ Setup
.. attribute:: SECURITY_LOG_STRING_IO_FLUSH_TIMEOUT

Timeout which set how often will be stored output stream to the log. Default value is ``5`` (s).

.. attribute:: SECURITY_LOG_STRING_OUTPUT_TRUNCATE_LENGTH

Max length of log output string. Default value is ``10000``.

.. attribute:: SECURITY_LOG_STRING_OUTPUT_TRUNCATE_OFFSET

Because too frequent string truncation can cause high CPU load, log string is truncated by more characters. This setting defines this value which is by default ``1000``.

140 changes: 139 additions & 1 deletion example/apps/test_security/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@
import uuid
import decimal

from django.utils.timezone import now

from germanium.test_cases.default import GermaniumTestCase
from germanium.tools import assert_equal
from germanium.decorators import data_consumer

from freezegun import freeze_time

from django.test import override_settings

from security.utils import serialize_data, deserialize_data
from security.utils import serialize_data, deserialize_data, truncate_lines_from_left, LogStringIO


class UtilsTestCase(GermaniumTestCase):
Expand Down Expand Up @@ -33,3 +40,134 @@ def test_utils_serialize_and_deserialize_data_should_return_right_values(self):
}
assert_equal(serialize_data(data), serialized_data)
assert_equal(deserialize_data(serialized_data), data)

@data_consumer(
(
('test', 'test', 10),
('long truncated value without newline', '…\n', 10),
('long truncated value with\nnewline', '…\nnewline', 10),
('long truncated value with\nnewline', '…\n', 5),
('long truncated value with newline after end \n', '…\n', 5)
)
)
def test_truncate_lines_from_left_should_truncate_text(self, original_text, truncated_text, length):
assert_equal(truncate_lines_from_left(original_text, length), truncated_text)

@override_settings(
SECURITY_LOG_STRING_OUTPUT_TRUNCATE_LENGTH=100,
SECURITY_LOG_STRING_OUTPUT_TRUNCATE_OFFSET=10
)
def test_log_string_io_should_be_truncated_according_to_settings(self):
log_string_io = LogStringIO(write_time=False)
for _ in range(20):
log_string_io.write((4*'a') + '\n')
assert_equal(log_string_io.getvalue(), 20*(4*'a' + '\n'))
assert_equal(len(log_string_io.getvalue()), 100)
log_string_io.write('b')
assert_equal(log_string_io.getvalue(), '…\n' + 17*(4*'a' + '\n') + 'b')

@override_settings(
SECURITY_LOG_STRING_OUTPUT_TRUNCATE_LENGTH=100,
SECURITY_LOG_STRING_OUTPUT_TRUNCATE_OFFSET=10
)
def test_log_string_io_should_remove_line_with_cr_lf(self):
log_string_io = LogStringIO(write_time=False)
log_string_io.write('text before newline\n')
log_string_io.write('removed text')
assert_equal(log_string_io.getvalue(), 'text before newline\nremoved text')
log_string_io.write('\rnew text')
assert_equal(log_string_io.getvalue(), 'text before newline\nnew text')

@override_settings(
SECURITY_LOG_STRING_OUTPUT_TRUNCATE_LENGTH=100,
SECURITY_LOG_STRING_OUTPUT_TRUNCATE_OFFSET=10
)
def test_log_string_io_should_truncate_and_remove_last_line(self):
log_string_io = LogStringIO(write_time=False)
log_string_io.write('very long text which should be removed\n')
log_string_io.write('very long text which shouldn\'t be removed\n')
assert_equal(
log_string_io.getvalue(),
'very long text which should be removed\n'
'very long text which shouldn\'t be removed\n'
)
log_string_io.write('text which should be removed with cr lf')
assert_equal(
log_string_io.getvalue(),
'…\n'
'very long text which shouldn\'t be removed\n'
'text which should be removed with cr lf'
)

log_string_io.write('\rnew text')
assert_equal(
log_string_io.getvalue(),
'…\n'
'very long text which shouldn\'t be removed\n'
'new text'
)

log_string_io.write('\rnew very long text which should remove prev text' + 9*' ')
assert_equal(
log_string_io.getvalue(),
'…\n'
'new very long text which should remove prev text' + 9*' '
)

log_string_io.write('\rnew very long text which should be removed at all' + 50*' ')
assert_equal(
log_string_io.getvalue(),
'…\n'
)

@freeze_time(now())
def test_log_string_io_should_add_time_and_log_to_output(self):
time_string = f'\x1b[0m\x1b[1m[{{}}] [{now().strftime("%d-%m-%Y %H:%M:%S")}]\x1b[0m '

log_string_io = LogStringIO()
log_string_io.write('test')
assert_equal(log_string_io.getvalue(), time_string.format(1) + 'test')

log_string_io.write(' test')
assert_equal(log_string_io.getvalue(), time_string.format(1) + 'test test')

log_string_io.write('\ntest')
assert_equal(
log_string_io.getvalue(),
time_string.format(1) + 'test test\n'
+ time_string.format(2) + 'test'
)

@override_settings(
SECURITY_LOG_STRING_OUTPUT_TRUNCATE_LENGTH=110,
SECURITY_LOG_STRING_OUTPUT_TRUNCATE_OFFSET=10
)
@freeze_time(now())
def test_log_string_io_should_add_time_and_log_to_output_but_remove_with_cr_lf(self):
time_string = f'\x1b[0m\x1b[1m[{{}}] [{now().strftime("%d-%m-%Y %H:%M:%S")}]\x1b[0m '

log_string_io = LogStringIO()
log_string_io.write('long text that should be removed \n')
assert_equal(
log_string_io.getvalue(),
time_string.format(1) + 'long text that should be removed \n' + time_string.format(2)
)

log_string_io.write('new text')
assert_equal(
log_string_io.getvalue(),
'…\n'
+ time_string.format(2) + 'new text'
)

log_string_io.write('write very long text' + 100*' ')
assert_equal(
log_string_io.getvalue(),
'…\n' + time_string.format(3)
)

log_string_io.write('new text')
assert_equal(
log_string_io.getvalue(),
'…\n' + time_string.format(3) + 'new text'
)
6 changes: 4 additions & 2 deletions security/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
'THROTTLING_FAILURE_VIEW': 'security.throttling.views.throttling_failure_view',
'LOG_REQUEST_IGNORE_IP': (),
'LOG_REQUEST_IGNORE_URL_PATHS': (),
'LOG_REQUEST_BODY_LENGTH': 1000,
'LOG_RESPONSE_BODY_LENGTH': 1000,
'LOG_REQUEST_BODY_LENGTH': 10000,
'LOG_RESPONSE_BODY_LENGTH': 10000,
'LOG_RESPONSE_BODY_CONTENT_TYPES': (
'application/json', 'application/xml', 'text/xml', 'text/csv',
),
Expand Down Expand Up @@ -70,6 +70,8 @@
'LOG_MAX_REVISIONS_COUNT': 20,
'RAISE_WRITER_EXCEPTIONS': False,
'THROTTLING_ENABLED': True,
'LOG_STRING_OUTPUT_TRUNCATE_LENGTH': 10000,
'LOG_STRING_OUTPUT_TRUNCATE_OFFSET': 1000,
}


Expand Down
73 changes: 62 additions & 11 deletions security/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,38 @@ def is_atty_string(s):
return bool(re.search(r'^' + re.escape('\x1b[') + r'\d+m$', s))


def truncate_lines_from_left(value, max_length):
"""
Firstly text is truncated according to max_length.
Next step is find next character '\n' and while text before \n is removed. On the start is added '…'
If there is only one line (without \n) which is longer than max_length. Whole line is removed and replaced with …\n
"""
if len(value) <= max_length:
return value

# value 2 is added because of …\n
truncated_value = value[len(value) + 2 - max_length:]
truncated_value_split_by_newline = truncated_value.split('\n', 1)
return f'…\n{"" if len(truncated_value_split_by_newline) == 1 else truncated_value_split_by_newline[1]}'


class LogStringIO(StringIO):

BOLD = '\x1b[1m'
RESET = '\x1b[0m'

def __init__(self, flush_callback=None, flush_timeout=None, *args, **kwargs):
def __init__(self, flush_callback=None, flush_timeout=None, write_time=True, *args, **kwargs):
super().__init__(*args, **kwargs)
self._is_closed = False
self._last_newline = 0
self._flush_callback = flush_callback
self._write_time = True
self._write_time = write_time
self._write_time_to_line = self._write_time
self._last_flush_time = time()
self._flush_timeout = settings.LOG_STRING_IO_FLUSH_TIMEOUT if flush_timeout is None else flush_timeout
self._flushed = True
self._next_line_number = 1

def _flush(self, force=False):
if (not self._flushed and self._flush_callback
Expand All @@ -54,6 +72,18 @@ def _flush(self, force=False):
self._last_flush_time = time()
self._flushed = True

def _write_line_number_and_time(self):
if self._write_time:
super().write('{}{}[{}] [{}]{} '.format(
self.RESET,
self.BOLD,
self._next_line_number,
now().strftime('%d-%m-%Y %H:%M:%S'),
self.RESET
))
self._write_time_to_line = False
self._next_line_number += 1

def write(self, s):
if not self._is_closed:
self._flushed = False
Expand All @@ -65,24 +95,45 @@ def write(self, s):
if not cursor_first:
self.seek(self._last_newline)
self.truncate()
self._write_time = True
self._write_time_to_line = self._write_time
cursor_first = False

if self._write_time and not is_atty_string(cursor_block):
cursor_block = '{}{}[{}]{} {}'.format(
self.RESET, self.BOLD, now().strftime('%d-%m-%Y %H:%M:%S'), self.RESET, cursor_block
)
self._write_time = False

if self._write_time_to_line and not is_atty_string(cursor_block):
self._write_line_number_and_time()
super().write(cursor_block)

if i != len(lines) - 1:
super().write('\n')
self._last_newline = self.tell()
self._write_time = True
self._write_time_to_line = self._write_time

self._truncate_by_max_length()
self._flush()

def _truncate_by_max_length(self):
"""
StringIO value is truncated from left side to maximal length defined in LOG_STRING_OUTPUT_TRUNCATE_LENGTH.
Because too frequent string truncation can cause high CPU load, log string is truncated by more characters.
The value is defined in LOG_STRING_OUTPUT_TRUNCATE_OFFSET. Eg
LOG_STRING_OUTPUT_TRUNCATE_LENGTH = 10000
LOG_STRING_OUTPUT_TRUNCATE_OFFSET = 1000
if text is longer than 10000 character is truncated to 9000 characters.
To better readability the text is further truncated to the following newline.
"""
if self.tell() > settings.LOG_STRING_OUTPUT_TRUNCATE_LENGTH:
original_value = self.getvalue()
truncated_value = truncate_lines_from_left(
self.getvalue(),
settings.LOG_STRING_OUTPUT_TRUNCATE_LENGTH - settings.LOG_STRING_OUTPUT_TRUNCATE_OFFSET
)
self._last_newline = self._last_newline - (len(original_value) - len(truncated_value))
self.seek(0)
self.truncate()
super().write(truncated_value)
if truncated_value.endswith('\n'):
self._write_line_number_and_time()

def isatty(self):
return True

Expand Down

0 comments on commit 8ae9f93

Please sign in to comment.