Code Review Stack Exchange is a question and answer site for peer programmer code reviews. Join them; it only takes a minute:

Sign up
Here's how it works:
  1. Anybody can ask a question
  2. Anybody can answer
  3. The best answers are voted up and rise to the top

I have developed the following code to hold n latest items it has received, and when asked for provide these n latest items. The interface is:

public interface ICircularBuffer<T> {
        void Put(T item);  // put an item
        T[] Read(); // provides the last "n" requests
}

The idea is to build a buffer that will hold "n" latest items that have been put and return those items when asked for.

For example:

var buf = new ConcurrentBuffer<int>(3); // creates a buffer of size = 3
buf.Put(15);
buf.Put(10);
buf.Put(20);

int[] arr = buf.Read() // returns - 20,10,15

buf.Put(25);
int[] arr = buf.Read() // returns - 25,20,10

The implementation uses "locks" and array to represent the circular buffer:

public class ConcuurentCircularBuffer<T> : ICircularBuffer<T> {

        private T[] buffer;
        private int last = 0;
        private int sz;
        private object lockObject = new object();

        public ConcuurentCircularBuffer(int sz) {
            // array index starts at 1
            this.sz = sz;
            buffer = new T[sz + 1];
        }

        public void Put(T item) {
            lock (lockObject) {
                last++;
                last = last > sz ? 1 : last;
                buffer[last] = item;
            }
        }

        public T[] Read() {
            T[] arr = new T[sz];

            lock (lockObject) {
                int iterator = 0;
                for (int read = 0; read < sz; read++) {
                    int index = last - iterator;
                    index = index <= 0 ? (sz + index) : index;
                    if (buffer[index] != null) {
                        arr[iterator] = buffer[index];
                    } else {
                        break;
                    }
                    iterator++;
                }
            }
            return arr;
        }
    }

The unit tests I am using to check the performance are as follows:

[TestMethod()]
        public void TestParallelPut() {
            int sz = 10;
            ConcuurentCircularBuffer<int> buf = new ConcuurentCircularBuffer<int>(sz);

            Stopwatch watch = new Stopwatch();
            ManualResetEventSlim evt = new ManualResetEventSlim(false);

            int nTimes = 100000;

            Task[] tasks = new Task[nTimes];
            watch.Start();
            for (int i = 0; i < nTimes; i++) {
                tasks[i] = Task.Run(() => {
                    evt.Wait();
                    buf.Put(i);
                });
            }

            // signal all to proceed
            evt.Set();
            Task.WaitAll(tasks);

            watch.Stop();
            double timeMs = watch.Elapsed.TotalMilliseconds;
            TestContext.WriteLine("Buffer size: {0}", sz);
            TestContext.WriteLine("Average time per put: {0} ms, total time taken for {1} parallel requests: {2} ms",timeMs/nTimes,nTimes,timeMs);

    }

This test tries to simulate maximum contention, by trying to put all n items in parallel.

The following test, puts n items in parallel, and also continues to read in parallel:

[TestMethod()]
        public void TestParallelPutAndRead() {
            int sz = 10;
            ConcuurentCircularBuffer<int> buf = new ConcuurentCircularBuffer<int>(sz);

            Stopwatch watch = new Stopwatch();
            ManualResetEventSlim evt = new ManualResetEventSlim(false);
            ManualResetEventSlim completeEvt = new ManualResetEventSlim(false);
            int nTimes = 100000;

            Task[] tasks = new Task[nTimes];
            watch.Start();
            for (int i = 0; i < nTimes; i++) {
                tasks[i] = Task.Run(() => {
                    evt.Wait();
                    buf.Put(i);
                });
            }

            // start reading, until told to stop
            Task.Run(() => {
                for(;;) {
                    if (completeEvt.IsSet) {
                        break;
                    } else {
                        int[] arr = buf.Read();
                        Assert.IsTrue(arr.Length == sz);
                    }
                }
            });

            // signal all to proceed
            evt.Set();
            Task.WaitAll(tasks);
            completeEvt.Set();

            watch.Stop();
            double timeMs = watch.Elapsed.TotalMilliseconds;
            TestContext.WriteLine("Test ParallelPut and Read - Buffer size: {0}", sz);
            TestContext.WriteLine("Average time per put: {0} ms, total time taken for {1} parallel requests: {2} ms", timeMs / nTimes, nTimes, timeMs);

        }

The following test, start "nTasks" in parallel and each task puts nTimes/nTasks. Another tasks keeps reading it as well:

[TestMethod()]
        public void TestPutAndNParallelAndRead() {
            int sz = 10;
            ConcuurentCircularBuffer<int> buf = new ConcuurentCircularBuffer<int>(sz);

            Stopwatch watch = new Stopwatch();
            ManualResetEventSlim evt = new ManualResetEventSlim(false);
            ManualResetEventSlim completeEvt = new ManualResetEventSlim(false);
            int nTimes = 1000000;
            int nTasks = 100;

            Task[] tasks = new Task[nTasks];
            watch.Start();
            for (int i = 0; i < nTasks; i++) {
                tasks[i] = Task.Run(() => {
                    evt.Wait();
                    for (int n = 0; n < nTimes / nTasks; n++) {
                        buf.Put(i);
                    }
                });
            }

            // start reading, until told to stop
            Task.Run(() => {
                for (;;) {
                    if (completeEvt.IsSet) {
                        break;
                    } else {
                        int[] arr = buf.Read();
                        Assert.IsTrue(arr.Length == sz);
                    }
                }
            });

            // signal all to proceed
            evt.Set();
            Task.WaitAll(tasks);
            completeEvt.Set();

            watch.Stop();
            double timeMs = watch.Elapsed.TotalMilliseconds;
            TestContext.WriteLine("Test Put and {0} Parallel Put and Read - Buffer size: {1}", nTasks,sz);
            TestContext.WriteLine("Average time per put: {0} ms, total time taken for {1} requests: {2} ms", timeMs / nTimes, nTimes, timeMs);

        }

The tests results are follows on Windows 7 Enterprise with Intel Core i5-4310U CPU @ 2.00 GHz:

Test Name:  TestParallelPut 
TestContext Messages: 
Test ParallelPut - Buffer size: 10 Average time per put: 0.002393078 ms, total time taken for 100000 parallel requests: 239.3078 ms

Test Name:  TestParallelPutAndRead 
Test Outcome:   Passed 
TestContext Messages: Test ParallelPut and Read - Buffer size: 10 Average time per put: 0.002982335 ms, total time taken for 100000 parallel requests:
298.2335 ms


Test Name:  TestPutAndNParallelAndRead 
Test Outcome:   Passed 
TestContext Messages: Test Put and 100 Parallel Put and Read - Buffer size: 10 Average time per put: 0.0003211726 ms, total time taken for 1000000 requests: 321.1726 ms

Is there any alternative approach (with better performance) that can be followed to achieve the required functionality?

Please review the code and tests.

share|improve this question
    
What's the use case for this container? Is the size 10 used by your tests a realistic size? You don't empty the buffer when you read from it, is it really OK for two successive reads to return overlapping data sets? – forsvarir 8 hours ago
    
The size could range from 10 to 1000. No two successive reads should not return overlapping data, every read should get "N" latest items – Ngm 7 hours ago
    
@Ngm If you return the N latest items without having added at least N new items in the mean time you will get overlapping data. – CodesInChaos 7 hours ago
    
Apologies, it can return overlapping data, the idea is: it should return the "N" latest items (latest one first order). – Ngm 7 hours ago
    
@Ngm please update the description in your question to reflect that. – forsvarir 7 hours ago

Naming

  • There is one 'u' in ConcuurentCircularBuffer that wants to be a 'r' --> ConcurrentCircularBuffer
  • There is no need to abbreviate name like size (sz --> size)
  • I would call last to something more descriptive like lastIndex.

Code Style

  • sz, buffer and lockObject should be read-only.
  • There is no need to skip the first array element, just start with 0.
  • iterator has always the same value as read. Therfore you can drop it.

Behavior

  • The array returned by Read has always the same size as the buffer, independend of the actual number of items. To avoid that, consider to change the type of arr to List<T>.
  • If one of the buffer's elements is null, you assume, that the end of the buffer is reached. Therefore: circularBuffer.Put(null) would clear the buffer. That is an unexpected behavior. Suggestion: Don't allow to put null values to the buffer (throw ArgumentNullException) and add a method Clear if needed.
  • When using a primitive type like int for T, the Read method returns an array with the size of the buffer with the default value, even if no item was added. That is also a little bit strange... I suppose you need something like a count property that stores the number of added items.
share|improve this answer

In addition to @JanDotNet answer.
You can use "less strict" lock in your code. I mean ReaderWriterLock or ReaderWriterLockSlim Because as i see from your you can execute Read in parallel threads(!) but cannot execute Put and Read in parallel. These classes are exactly for this case. It will improve the overall performance.

share|improve this answer
    
Wow why did I not do this? good comment – Ngm 8 hours ago
    
If comment is good than mark it as helpful :) – Disappointed 8 hours ago
3  
Depending on the access patterns the higher overhead of RW locks might actually decrease performance. Make sure to benchmark it for your use-case. – CodesInChaos 7 hours ago
    
@CodesInChaos Yes, you are right. ReadWriteLock might not improve performance. But even without performance it looks more natural here – Disappointed 7 hours ago

You can make your code much simpler if you use a LinkedList<T> instead of an array. It has also a much better performance when inserting and removing items in this case which are both O(1) operations:

public class ConcurrentCircularBuffer<T> : ICircularBuffer<T>
{
    private readonly LinkedList<T> _buffer;
    private int _maxItemCount;

    public ConcurrentCircularBuffer(int maxItemCount)
    {
        _maxItemCount = maxItemCount;
        _buffer = new LinkedList<T>();
    }

    public void Put(T item)
    {
        lock (_buffer)
        {
            _buffer.AddFirst(item);
            if (_buffer.Count > _maxItemCount)
            {
                _buffer.RemoveLast();
            }
        }
    }

    public IEnumerable<T> Read()
    {
        lock(_buffer) { return _buffer.ToArray(); }
    }
}

MSDN:

LinkedList.RemoveLast Method ()

LinkedList.AddFirst Method (T)

share|improve this answer
    
I would prefer that solution because the implementaion is simpler. But even if the performance difference isn't significant, I can't imagine that the LinkedList has a better performance compared to a raw array buffer. – JanDotNet 5 hours ago
1  
+1 It's always astounding to see the difference that a data type can make to the complexity of these things :) – RobH 4 hours ago
    
I agree, I think this implementation is way better. Unless some actual performance issues are encountered, i think readability should come first. You can also use Queue instead of LinkedList: it has simpler api, IMHO. – Nikita B 3 hours ago
1  
@NikitaB I started with a Queue but then you would need to use a Reverse() for reading and it can cost some time for larger collections. – t3chb0t 3 hours ago
    
@t3chb0t , ah, yes, I've missed the fact, that last added elements should go first. LinkedList is definitely better in that case. – Nikita B 3 hours ago

Just scratching the surface of your code

  1. Your interface ICircularBuffer<T> breaks SRP- Interface Segregration. This inteface is responsible for Putting and Reading items .
  2. There is a lot of repetition in your test methods. One thing you have to avoid in pogramming is DRY . Instead of defining ConcuurentCircularBuffer<int>, Stopwatch, ManualResetEventSlim e.g

    [TestInitialize]
    public void Initialize() {
        int sz = 10;
        ConcuurentCircularBuffer<int> buf = new ConcuurentCircularBuffer<int>(sz);
        Stopwatch watch = new Stopwatch();;
        ManualResetEventSlim evt=new ManualResetEventSlim(false);;
        ManualResetEventSlim completeEvt=new ManualResetEventSlim(false);;      
    }
    
  3. You could call a TestCleanup to set your objects to null e.g

    [TestCleanup()]
    public void Cleanup()
    { /* assign null to the objects */ }
    
  4. Your naming convention for your test methods can be improved. Most unit testers use this convention-Naming Standard for Unit test by Roy Osherove

    [UnitOfWork_StateUnderTest_ExpectedBehavior]
    
  5. A possible bug that could occur - If a negative number was passed as a parameter to the constructor e.g -1

    public ConcuurentCircularBuffer(int sz) {
        // array index starts at 1
        this.sz = sz;
        buffer = new T[sz + 1];
    }
    

    you want to check if a buffer length > 0 - then some computations should not be performed.

  6. Before progressing into parallelPut and ParallelRead you should test one, two, three items are added or read to the buffer. In Nunit you can use the Assert class to check if the items returned are equal to what's stored in the buffer.
share|improve this answer
    
The OP is using the Microsoft testing framework, however Assert is still be available. – forsvarir 8 hours ago
2  
5: int is a value type and so cannot be null. Testing for negative numbers could be a great idea though. – RobH 7 hours ago
    
@RobH yeah right – Tolani Jaiye-Tikolo 6 hours ago
1  
@TolaniJaiye-Tikolo: Interface Segregration means, that (big) interfaces should be splitted to become more client specific. Therefore, it is OK to have multiple methods (or responsibilities) in one interface. See also ISet, IList, ... from .net Framework. – JanDotNet 6 hours ago
    
@JanDotNet thanks mates I will update my answer accordingly – Tolani Jaiye-Tikolo 5 hours ago

I just wanted to add, that for large buffers Buffer.BlockCopy (or Array.Copy or other similar methods) will give a way better performance, than element-by-element copying.

Also this:

// array index starts at 1
buffer = new T[sz + 1];

looks like an accident waiting to happen. The entire framework is built around zero-based indexes. I would not recommend having it any other way in your collections. Otherwise it is really easy to make a mistake: your program will have hundreds of arrays flying around, where most of them will be regular arrays, except for that one special array, where you have to skip first element and subtract 1 from Length before doing any operations (including LINQ queries!). At least make a custom array class, which would override Length and would throw an exception when you access 0 index. Or save yourself a trouble and just use regular array. And add -1 to index when reading form it.

share|improve this answer

Your Answer

 
discard

By posting your answer, you agree to the privacy policy and terms of service.

Not the answer you're looking for? Browse other questions tagged or ask your own question.