finished LogarithmicBuffer, tested with new unittest
authorChristian Herdtweck <christian.herdtweck@intra2net.com>
Wed, 13 Jan 2016 09:45:54 +0000 (10:45 +0100)
committerChristian Herdtweck <christian.herdtweck@intra2net.com>
Wed, 13 Jan 2016 09:45:54 +0000 (10:45 +0100)
buffers.py
test/buffer_unittest.py

index b586820..d801555 100644 (file)
@@ -30,7 +30,7 @@ Featuring::
 
 class CircularBuffer:
     """ circular buffer for data; saves last N sets
-    
+
     can return or run func on them afterwards in correct order
 
     public attributes (please read only!): buffer_size, n_items
@@ -81,11 +81,11 @@ class CircularBuffer:
 
 
 class LogarithmicBuffer:
-    """ saves only last N items, and after that less and less; old stuff is del
+    """ saves only last N items, and after that less and less old data
 
     --> grows only logarithmically in size
 
-    has a data_new which is a CircularBuffer for the last N items
+    has a data_new which is a CircularBuffer for the newest N items
     data_old is a list of items with ages approx 2, 4, 8, 16, ... counts
 
     Could easily extend to other bases than 2 to make even scarcer
@@ -96,44 +96,54 @@ class LogarithmicBuffer:
         if n_new < 0:
             raise ValueError('n_new must be non-negative!')
         if n_new == 0:
-            self.data_new = None
+            self._data_new = None
         else:
-            self.data_new = CircularBuffer(n_new)
-        self.count = 0
-        self.data_old = []
+            self._data_new = CircularBuffer(n_new)
+        self._count = 0
+        self._data_old = []
 
     def add(self, new_item):
         """ add a new item to buffer """
-        if self.data_new:
-            newest_old_item = data_new.add(new_item)
-            age = count - n_new
+        if self._data_new:
+            newest_old_item = self._data_new.add(new_item)
+            age = self._count - self.n_new
         else:
             newest_old_item = new_item
-            age = count
+            age = self._count
 
-        _save_old(newest_old_item, age, 0)
-        self.count += 1
+        if age >= 0:
+            self._save_old(newest_old_item, age, 0)
+        self._count += 1
 
-    def _save_old(item, age, index):
+    def _save_old(self, item, age, index):
         """ internal helper for saving (or not) items in data_old """
 
         # determine whether we throw it away or actually save it
         if age % 2**index != 0:
             return
 
-        # we will save it. But before overwriting the item that was there,
-        # we determine whether that will be saved at next index
-        _save_old(data_old[index], age, index+1)
-
-        # now do save at index, possibly extending data_old
-        if len(data_old) < index:
-            data_old.append(item)
+        # we save it. But before check if we need to extend data_old or
+        # save the item we would overwrite
+        if len(self._data_old) <= index:
+            self._data_old.append(item)
         else:
-            data_old[index] = item
+            self._save_old(self._data_old[index], age, index+1)
+            self._data_old[index] = item
 
     def get_all(self):
         """ get all buffer contents in correct order """
-        if self.data_new:
-            return self.data_old[::-1] + self.data_new.get_all()
+        if self._data_new:
+            return self._data_old[::-1] + self._data_new.get_all()
         else:
-            return self.data_old[::-1]
+            return self._data_old[::-1]
+
+    def get_size(self):
+        """ returns current number of saved elements in buffer
+
+        size is O(log n) where n = number of added items
+
+        more precisely: size is n_new + floor(log_2(m-1))+1
+                 where: m = n-n_new > 1
+                        (m=0 --> size=n_new;   m=1 --> size=n_new+1)
+        """
+        return self.n_new + len(self._data_old)
index 9871a04..3c16ed0 100644 (file)
@@ -10,6 +10,7 @@ For help see :py:mod:`unittest`
 """
 
 import unittest
+from math import log, floor
 
 import buffers
 
@@ -36,16 +37,40 @@ class CircBufferTester(unittest.TestCase):
                 self.assertTrue(all(elem == None for elem in elems[elem+1:]))
 
 
-#class LogarithmicBufferTest(unittest.TestCase):
-#    """ test class LogarithmicBuffer """
-#
-#    def test_some_func(self):
-#        """ tests some_func """
-#
-#        this_is_implemented = False
-#        self.assertTrue(this_is_implemented, 'Appears not yet implemented')
-#
-#        self.fail('Fail in any case, but with this nice message')
+def log2(x):
+    """ Log2 does not exist before python3.3 """
+    return log(x, 2)
+
+
+class LogarithmicBufferTest(unittest.TestCase):
+    """ test class LogarithmicBuffer """
+
+    def test_illegal_size(self):
+        for size in (-10, -1):
+            self.assertRaises(ValueError, buffers.LogarithmicBuffer, size)
+
+    def test_basic(self):
+        for size_new in (0, 1, 2, 10):
+            buf = buffers.LogarithmicBuffer(size_new)
+
+            for elem in range(1000):
+                buf.add(elem)
+
+                # test size
+                n_added = elem+1
+                n_in_old = n_added - size_new
+                if n_in_old <= 0:
+                    self.assertEqual(buf.get_size(), size_new)
+                elif n_in_old == 1:
+                    self.assertEqual(buf.get_size(), size_new + 1)
+                else:
+                    self.assertEqual(buf.get_size(),
+                                     size_new + floor(log2(n_in_old-1))+1)
+
+                # test new contents really is the newest
+                start = max(0, elem-size_new)
+                expect_new = range(start, elem+1)
+                self.assertEqual(buf.get_all()[-len(expect_new):], expect_new)
 
 
 if __name__ == '__main__':