class CircularBuffer:
""" circular buffer for data; saves last N sets
-
+
can return or run func on them afterwards in correct order
public attributes (please read only!): buffer_size, n_items
class LogarithmicBuffer:
- """ saves only last N items, and after that less and less; old stuff is del
+ """ saves only last N items, and after that less and less old data
--> grows only logarithmically in size
- has a data_new which is a CircularBuffer for the last N items
+ has a data_new which is a CircularBuffer for the newest N items
data_old is a list of items with ages approx 2, 4, 8, 16, ... counts
Could easily extend to other bases than 2 to make even scarcer
if n_new < 0:
raise ValueError('n_new must be non-negative!')
if n_new == 0:
- self.data_new = None
+ self._data_new = None
else:
- self.data_new = CircularBuffer(n_new)
- self.count = 0
- self.data_old = []
+ self._data_new = CircularBuffer(n_new)
+ self._count = 0
+ self._data_old = []
def add(self, new_item):
""" add a new item to buffer """
- if self.data_new:
- newest_old_item = data_new.add(new_item)
- age = count - n_new
+ if self._data_new:
+ newest_old_item = self._data_new.add(new_item)
+ age = self._count - self.n_new
else:
newest_old_item = new_item
- age = count
+ age = self._count
- _save_old(newest_old_item, age, 0)
- self.count += 1
+ if age >= 0:
+ self._save_old(newest_old_item, age, 0)
+ self._count += 1
- def _save_old(item, age, index):
+ def _save_old(self, item, age, index):
""" internal helper for saving (or not) items in data_old """
# determine whether we throw it away or actually save it
if age % 2**index != 0:
return
- # we will save it. But before overwriting the item that was there,
- # we determine whether that will be saved at next index
- _save_old(data_old[index], age, index+1)
-
- # now do save at index, possibly extending data_old
- if len(data_old) < index:
- data_old.append(item)
+ # we save it. But before check if we need to extend data_old or
+ # save the item we would overwrite
+ if len(self._data_old) <= index:
+ self._data_old.append(item)
else:
- data_old[index] = item
+ self._save_old(self._data_old[index], age, index+1)
+ self._data_old[index] = item
def get_all(self):
""" get all buffer contents in correct order """
- if self.data_new:
- return self.data_old[::-1] + self.data_new.get_all()
+ if self._data_new:
+ return self._data_old[::-1] + self._data_new.get_all()
else:
- return self.data_old[::-1]
+ return self._data_old[::-1]
+
+ def get_size(self):
+ """ returns current number of saved elements in buffer
+
+ size is O(log n) where n = number of added items
+
+ more precisely: size is n_new + floor(log_2(m-1))+1
+ where: m = n-n_new > 1
+ (m=0 --> size=n_new; m=1 --> size=n_new+1)
+ """
+ return self.n_new + len(self._data_old)
"""
import unittest
+from math import log, floor
import buffers
self.assertTrue(all(elem == None for elem in elems[elem+1:]))
-#class LogarithmicBufferTest(unittest.TestCase):
-# """ test class LogarithmicBuffer """
-#
-# def test_some_func(self):
-# """ tests some_func """
-#
-# this_is_implemented = False
-# self.assertTrue(this_is_implemented, 'Appears not yet implemented')
-#
-# self.fail('Fail in any case, but with this nice message')
+def log2(x):
+ """ Log2 does not exist before python3.3 """
+ return log(x, 2)
+
+
+class LogarithmicBufferTest(unittest.TestCase):
+ """ test class LogarithmicBuffer """
+
+ def test_illegal_size(self):
+ for size in (-10, -1):
+ self.assertRaises(ValueError, buffers.LogarithmicBuffer, size)
+
+ def test_basic(self):
+ for size_new in (0, 1, 2, 10):
+ buf = buffers.LogarithmicBuffer(size_new)
+
+ for elem in range(1000):
+ buf.add(elem)
+
+ # test size
+ n_added = elem+1
+ n_in_old = n_added - size_new
+ if n_in_old <= 0:
+ self.assertEqual(buf.get_size(), size_new)
+ elif n_in_old == 1:
+ self.assertEqual(buf.get_size(), size_new + 1)
+ else:
+ self.assertEqual(buf.get_size(),
+ size_new + floor(log2(n_in_old-1))+1)
+
+ # test new contents really is the newest
+ start = max(0, elem-size_new)
+ expect_new = range(start, elem+1)
+ self.assertEqual(buf.get_all()[-len(expect_new):], expect_new)
if __name__ == '__main__':