Merge branch 'logger-accept-handlers'
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Wed, 10 Nov 2021 09:29:57 +0000 (10:29 +0100)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Wed, 10 Nov 2021 09:29:57 +0000 (10:29 +0100)
src/log_helpers.py
test/test_log_helpers.py [new file with mode: 0644]

index 3163971..b76a011 100644 (file)
@@ -43,6 +43,7 @@ Further ideas: ::
 
 import logging
 from logging import DEBUG, INFO, WARNING, ERROR, CRITICAL, NOTSET
+from logging.handlers import RotatingFileHandler
 from math import log10, floor
 import sys
 
@@ -159,7 +160,8 @@ def get_logger(name, *args, **kwargs):
 
 
 class I2nLogger:
-    """ a more convenient logger
+    """
+    A more convenient logger
 
     Features::
     * can be used without ".format()" as follows::
@@ -180,6 +182,8 @@ class I2nLogger:
       (assumes one line per call to log/debug/info/notice/...,
        only counts calls with priority above this logger's level)
     * provides shorter method names: dbg, note, warn, err
+    * adds a NullHandler if `streams` and `files` and `handlers` are all `None`
+    * convenience functions to add most frequently used handlers
 
     ..note:: Creating multiple instances with the same name is not allowed and
              will result in an error. Use :py:func:`get_logger` instead of this
@@ -191,9 +195,9 @@ class I2nLogger:
 
     def __init__(self, name, level=INFO, fmt=DEFAULT_SHORT_LEVEL_FORMAT,
                  datefmt=DEFAULT_SHORT_LEVEL_DATE_FORMAT,
-                 streams=STDOUT, files=None):
-        """ creates a I2nLogger; forwards args to logging.getLogger
-
+                 streams=STDOUT, files=None, handlers=None):
+        """
+        Creates a I2nLogger; forwards args to logging.getLogger
 
         :param str name: name of this logger, best practice is module name
         :param int level: best use one of the constants DEBUG, INFO NOTICE,
@@ -204,7 +208,13 @@ class I2nLogger:
                             :py:class:`ShortLevelFormatter` for more help
         :param streams: list/tuple of or a single stream to log to, default is
                         STDOUT (=sys.stdout)
+        :type streams: (:py:class:`io.IOBase`) or [:py:class:`io.IOBase`) or
+                       None
         :param files: list/tuple or single file name to log to
+        :type files: None or (str) or [str]
+        :param handlers: other handlers to customize and use with this logger
+        :type handlers: None or [:py:class:`logging.Handler`] or
+                        (:py:class:`logging.Handler`)
         :raise: ValueError if an I2nLogger with the same name exists already
         """
 
@@ -228,7 +238,7 @@ class I2nLogger:
 
         # create new handlers and formatter
         if streams is None:
-            stream = []
+            streams = []
         elif not isinstance(streams, (list, tuple)):
             streams = (streams, )
         for stream in streams:
@@ -251,6 +261,20 @@ class I2nLogger:
             new_handler.setLevel(self._level)
             self._log.addHandler(new_handler)
 
+        if handlers is None:
+            handlers = []
+        elif not isinstance(handlers, (list, tuple)):
+            handlers = (handlers, )
+        for handler in handlers:
+            formatter = ShortLevelFormatter(fmt=fmt, datefmt=datefmt)
+            formatter.add_level(NOTICE, 'note')
+            handler.setFormatter(formatter)
+            handler.setLevel(self._level)
+            self._log.addHandler(handler)
+
+        if not self._log.handlers:
+            self._log.addHandler(logging.NullHandler())
+
         # remember that this logger is a I2nLogger
         _i2n_loggers[name] = self
 
@@ -301,7 +325,7 @@ class I2nLogger:
             self._log.log(level, message_formatted, **kwargs)
 
     def _log_no_test(self, level, message, *args, **kwargs):
-        """ same as log() but without the isinstance test for internal use"""
+        """Same as log() but without the isinstance test for internal use."""
         if level >= self._level:
             try:
                 message_formatted = message.format(*args)
@@ -310,9 +334,10 @@ class I2nLogger:
             self._log.log(level, message_formatted, **kwargs)
 
     def log_count_if_interesting(self, count, level=INFO, counter_name=None):
-        """ Log value of a counter in gradually coarser intervals
+        """
+        Log value of a counter in gradually coarser intervals
 
-        see :py:func:`is_interesting_count` for definition of "interesting"
+        See :py:func:`is_interesting_count` for definition of "interesting".
         """
         if is_interesting_count(count):
             if counter_name:
@@ -321,15 +346,16 @@ class I2nLogger:
                 self.log(level, 'Counter is at {0}', count)
 
     def get_level(self):
-        """ return int level of this logger """
+        """Return int level of this logger."""
         return self._level
 
     def get_level_str(self):
-        """ returns :py:func:`logging.getLevelName` on :py:meth:`get_level` """
+        """Returns :py:func:`logging.getLevelName` on :py:meth:`get_level`."""
         return logging.getLevelName(self._level)
 
     def set_level(self, new_level):
-        """ set level given an int or a str
+        """
+        Set level given an int or a str.
 
         :arg new_level: int or str (str is converted to lower case)
         :raises: KeyError if new_level is a string that is not in
@@ -343,9 +369,53 @@ class I2nLogger:
         for handler in self._log.handlers:
             handler.setLevel(self._level)
 
+    def add_handler(self, *args, **kwargs):
+        return self._log.addHandler(*args, **kwargs)
+
+    def add_stream_handler(self, stream=sys.stdout, level='debug'):
+        """
+        Add a stream handler to the current logger that logs to given stream.
+
+        Note that the default here is stdout, not the python-default (stderr).
+        """
+        handler = logging.StreamHandler(stream)
+        handler.setLevel(self._level)
+        self._log.addHandler(handler)
+
+    def add_syslog_handler(self):
+        """Add a handler that logs to syslog."""
+        handler = SysLogHandler(address='/dev/log')
+        handler.setLevel(self._level)
+        self._log.addHandler(handler)
+
+    def add_file_handler(self, filename, mode='a', max_bytes=5*1024*1024,
+                         max_rotations=100, **kwargs):
+        """
+        Add a handler that logs to given file.
+
+        For convenience creates a
+        :py:class:`logging.handlers.RotatingFileHandler` to avoid huge log
+        files filling up the disc in case of error.
+
+        :param str mode: Mode with which to open log file; forwarded to handler
+        :param int max_bytes: Max size in bytes of a log file before it is
+                              renamed to `[filename].1` and logging continues
+                              with an empty `[filename]`.
+        :param int max_rots: Max number of rotated files to keep
+        :param dict kwargs: All other args (e.g. `encoding`) are forwarded to
+                            handler constructor. `maxBytes` and `backupCount`
+                            are overwritten with `max_bytes` and `max_rots`.
+        """
+        kwargs['maxBytes'] = max_bytes
+        kwargs['backupCount'] = max_rotations
+        handler = RotatingFileHandler(filename, mode, **kwargs)
+        handler.setLevel(self._level)
+        self._log.addHandler(handler)
+
 
 def n_digits(number):
-    """ returns the number of digits a number has in decimal format
+    """
+    Returns the number of digits a number has in decimal format
 
     :returns: 1 for 1...9, 2 for 10...99, 3 for 100...999, ...
               0 for 0 (and everything else beween -1 and 1)
@@ -356,8 +426,10 @@ def n_digits(number):
     else:
         return floor(log10(abs(number)))+1
 
+
 def is_interesting_count(counter):
-    """ return True if counter has reached an "interesting" value
+    """
+    Return True if counter has reached an "interesting" value
 
     For the counter to be "interesting" becomes ever harder. At first it is
     easy (returns True for 1,2,3,6,10), then becomes harder (True only for
@@ -373,76 +445,8 @@ def is_interesting_count(counter):
                 log('reached iteration {0}'.format(counter))
             counter += 1
 
-    Or implicitly using I2nLogger::log_count_if_interesting(counter)
+    Used by :py:meth:`I2nLogger::log_count_if_interesting`.
 
     :returns: True for a few values of counter, False most of the time
     """
-
     return float(counter) / 10.**(n_digits(counter)-1.) in (1., 2., 3., 6.)
-
-def test_short_level_format():
-    """ quick test of :py:class:`ShortLevelFormatter` """
-
-    logger = logging.getLogger('logtest')
-    logger.setLevel(DEBUG)
-    handler = logging.StreamHandler()
-    handler.setLevel(DEBUG)
-    formatter = ShortLevelFormatter(
-        '%(asctime)s:%(msecs)03d %(shortlevel)s| %(msg)s'
-        '   [regular levelname=%(levelname)s]',
-        datefmt='%H:%M:%S')
-    handler.setFormatter(formatter)
-    logger.addHandler(handler)
-
-    # 'application' code
-    logger.debug('debug message')
-    logger.info('info message')
-    logger.warn('warn message')
-    logger.error('error message')
-    logger.critical('critical message')
-    logger.log(15, 'unknown level')
-    logger.log(NOTSET, 'level not set')
-
-    # add notice level
-    notice = (logging.INFO + logging.WARNING)/2
-    formatter.add_level(notice, 'note')
-    logger.log(notice, 'more important than info but no warning nor error')
-
-    # try if exception formatting still works:
-    try:
-        logger.info('this is what an exception looks like:')
-        impossible_result = 1/0
-        logger.critical('just divided 1/0! The result is {0}'
-                        .format(impossible_result))
-    except ZeroDivisionError:
-        logger.exception('1/0 still does not work!', exc_info=True)
-
-    # done
-    logger.info('done testing')
-
-def test_get_logger():
-    log = get_logger('logger_test')
-    log2 = get_logger('logger_test')
-    print(log == log2)
-
-
-def test_line_counter():
-    log = get_logger('logger_test', max_lines=10)
-    for idx in range(20):
-        for _ in range(20):
-            log.debug('should not show nor count')
-        print('calling log for idx {0}'.format(idx))
-        log.info('logging with idx {0}', idx)
-        log.log_count_if_interesting(idx)
-
-
-def test_error_in_formatting():
-    log = get_logger('logger_test')
-    log.warn('forgot to add argument {} and {1} and {cnf} to format string')
-    log.warn('wrong number {} of arguments {}', 1)
-
-if __name__ == '__main__':
-    #test_short_level_format()
-    #test_get_logger()
-    #test_line_counter()
-    test_error_in_formatting()
diff --git a/test/test_log_helpers.py b/test/test_log_helpers.py
new file mode 100644 (file)
index 0000000..0f10dcd
--- /dev/null
@@ -0,0 +1,150 @@
+#!/usr/bin/env python3
+
+"""
+Test log_helpers.
+
+.. codeauthor:: Intra2net AG <info@intra2net.com>
+"""
+
+import sys
+import unittest
+import logging
+from tempfile import mkstemp
+import os
+from os.path import isfile, basename
+import re
+
+from src import log_helpers
+
+
+class LogHelperTest(unittest.TestCase):
+
+    def setUp(self):
+        """Create temp file to write log to."""
+        descriptor, self.temp_name = mkstemp(
+            prefix='pyi2ncommon-test-log-helpers-', suffix='.log', text=True)
+        self.temp_handle = os.fdopen(descriptor, 'at')
+
+    def tearDown(self):
+        """Remove temp log file."""
+        self.temp_handle.close()
+        if isfile(self.temp_name):
+            os.unlink(self.temp_name)
+
+    def get_logger_name(self):
+        """Return a unique name for a logger."""
+        return 'logtest-' + basename(self.temp_name)
+
+    def check_expected_contents(self, expected_contents):
+        """
+        Helper for other tests: open temp file, compare contents.
+
+        Ignores timestamp and line numbers
+        """
+        self.temp_handle.close()
+        with open(self.temp_name, 'rt') as log_reader:
+            for line_no, (actual_line, expect_line) in \
+                    enumerate(zip(log_reader, expected_contents.splitlines())):
+                if re.match(r'\d{1,2}:\d{2}:\d{2}:\d{3}\s+', actual_line):
+                    cmp_line = actual_line[13:].rstrip()
+                elif re.match(r'\s*File ".+", line \d+, in .+', actual_line):
+                    cmp_line = '  TRACEBACK LINE REPLACED'
+                else:
+                    cmp_line = actual_line.rstrip()
+                self.assertEqual(cmp_line, expect_line)
+
+    def test_short_level_format(self):
+        """Tests that calls to :py:class:`ShortLevelFormatter` do not fail."""
+        logger = logging.getLogger(self.get_logger_name())
+        logger.setLevel(logging.DEBUG)
+        handler = logging.StreamHandler(self.temp_handle)
+        handler.setLevel(logging.DEBUG)
+        formatter = log_helpers.ShortLevelFormatter(
+            '%(asctime)s:%(msecs)03d %(shortlevel)s| %(msg)s'
+            '   [regular levelname=%(levelname)s]',
+            datefmt='%H:%M:%S')
+        handler.setFormatter(formatter)
+        logger.addHandler(handler)
+
+        # 'application' code
+        logger.debug('debug message')
+        logger.info('info message')
+        logger.warning('warn message')
+        logger.error('error message')
+        logger.critical('critical message')
+        logger.log(15, 'unknown level')
+        logger.log(logging.NOTSET, 'level not set')
+
+        # add notice level
+        notice = (logging.INFO + logging.WARNING)//2
+        formatter.add_level(notice, 'note')
+        logger.log(notice, 'more important than info but no warning nor error')
+
+        # try if exception formatting still works:
+        try:
+            logger.info('this is what an exception looks like:')
+            impossible_result = 1/0
+            logger.critical('just divided 1/0! The result is {0}'
+                            .format(impossible_result))
+        except ZeroDivisionError:
+            logger.exception('1/0 still does not work!', exc_info=True)
+
+        # done
+        logger.info('done testing')
+
+        # now check the file
+        expected_contents = \
+            'dbug| debug message   [regular levelname=DEBUG]\n' \
+            'info| info message   [regular levelname=INFO]\n' \
+            'warn| warn message   [regular levelname=WARNING]\n' \
+            'err | error message   [regular levelname=ERROR]\n' \
+            'crit| critical message   [regular levelname=CRITICAL]\n' \
+            '????| unknown level   [regular levelname=Level 15]\n' \
+            'note| more important than info but no warning nor error   [regular levelname=NOTICE]\n' \
+            'info| this is what an exception looks like:   [regular levelname=INFO]\n' \
+            'err | 1/0 still does not work!   [regular levelname=ERROR]\n' \
+            'Traceback (most recent call last):\n' \
+            '  TRACEBACK LINE REPLACED\n' \
+            '    impossible_result = 1/0\n' \
+            'ZeroDivisionError: division by zero\n' \
+            'info| done testing   [regular levelname=INFO]\n'
+        self.check_expected_contents(expected_contents)
+
+    def test_get_logger(self):
+        log = log_helpers.get_logger('logger_test')
+        log2 = log_helpers.get_logger('logger_test')
+        self.assertEqual(log, log2)
+
+    def test_interesting_lines(self):
+        log = log_helpers.get_logger(self.get_logger_name(),
+                                     streams=self.temp_handle)
+        for idx in range(20):
+            log.log_count_if_interesting(idx)
+
+        expected_contents = 'info| Counter is at 1\n' \
+                            'info| Counter is at 2\n' \
+                            'info| Counter is at 3\n' \
+                            'info| Counter is at 6\n' \
+                            'info| Counter is at 10\n'
+        self.check_expected_contents(expected_contents)
+
+    def test_error_in_formatting(self):
+        """Ensure that forgotten args to not raise err but issue warning."""
+        self.check_expected_contents(expected_contents)
+
+    def test_error_in_formatting(self):
+        """Ensure that forgotten args to not raise err but issue warning."""
+        log = log_helpers.get_logger(self.get_logger_name(),
+                                     streams=self.temp_handle)
+        log.warn('forgot to add argument {} and {1} and {cnf} to format string')
+        log.warn('wrong number {} of arguments {}', 1)
+
+        expected_contents = \
+            'warn| [LOG ARGS!] forgot to add argument {} and {1} and {cnf} ' \
+            'to format string\n' \
+            'warn| [LOG ARGS!] wrong number {} of arguments {}\n'
+        self.check_expected_contents(expected_contents)
+
+
+if __name__ == '__main__':
+    unittest.main()