2011/04/08

Making a thread safe queue in C++ (I)

Hey folks,

I'm involved at work in converting a Java application to C++. I won't go into the details to don't get you bored but I run into the problem of how to get the Java BlockingQueue functionalities in C++, that is, a thread-safe queue that makes consuming threads wait if the queue is empty. Since I'm using Boost libraries to make this as interoperable as possible, I first had a glance at Boost data structures but to no avail, none seemed to fit my needs.

After some digging I decided to implement one of my own, using a mix of STL queue and Boost mutex to make it workConclusion. So let's get our hand dirty :). At first I thought my queue should have the following methods:

  • pop
  • push
  • size
  • empty
  • toString (to get a contents string representation)
Looking for some info on how to do this, I stumbled upon this excelent article (thanks to author Anthony Williams), which guide me in the right direction. So after careful reading I started typing some code. Let us see the class declaration:

#include <queue>
#include <sstream>
#include <boost/thread/mutex.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/any.hpp>
#include <boost/thread/condition_variable.hpp>


#ifndef SYNCHRONIZEDQUEUE_DEF
#define SYNCHRONIZEDQUEUE_DEF

template<class T>
class SynchronizedQueue
{

private:
 ///The queue itself
 std::queue<T> sQueue;
 ///A mutex object to control access to the std::queue
 boost::mutex io_mutex_;
 ///A variable condition to make threads wait on specified condition values
 boost::condition_variable waitCondition;

public:
 //default Constructor
 SynchronizedQueue();
 //destructor
 virtual ~SynchronizedQueue();
 void push(T& element);
 bool empty();
 bool pop(T& element);
 unsigned int sizeOfQueue();
 void waitAndPop(T& element);
 std::string toString();
  
};

    As you can see above, our SynchronizedQueue object will have the typical push, pop, empty and size methods. Pop method receives a reference to the T object, instead of returning a reference, which should be the logical thing, I'll explain the reason for this later. Besides we have a waitAndPop method, I will go into the details later, but just let it be known that it will be used to make consuming threads wait based on certain criteria.

    Attributes

    Regarding the attributes, the SyncrhonizedQueue wraps a standard std::queue:

    std::queue<T> sQueue;
    

    This is actually the queue that will store the data (T type). I specified I want copies of the objects (T) instead of holding pointers, the reason behind this is that I don't have control of the objects inserted, so if some are deleted outside the scope of the queue, I'd have bad references. As a standard queue, it is a FIFO structure, so first input is first output, exactly what I needed for my purposes. Next attribute is a boost::mutex that will be used to define protected regions, thus only one thread can access such regions.

    boost::mutex io_mutex_;
    

    Lastly, we need a wait condition. In some cases it can be interesting to make consuming threads wait until the queue has some elements. For this purpose, we need a boost::condition_variable.

    boost::condition_variable waitCondition;
    


    Methods

    Since this is a template class, everything goes into a the header file, or independent header files per method (that's what I normally do) and then use includes to insert the code in the main header. Anyway, the constructor and destructor have nothing of special:

    template<class T>
    //default constructor
    SynchronizedQueue<T>::SynchronizedQueue(){}
    
    
    template<class T>
    //default desconstructor
    SynchronizedQueue<T>::~SynchronizedQueue(){}
    

    push

    Below you can see the implementation of the push method:

    template<class T>
    void SynchronizedQueue<T>::push(T element)
    {
        //try to lock the mutex
        boost::mutex::scoped_lock lock(io_mutex_);
        //insert the element in the FIFO queue
        SynchronizedQueue::sQueue.push(element);
        //Now we need to unlock the mutex otherwise waiting threads will not be able to wake and lock the
        //mutex by time before push is locking again
        lock.unlock();
        //notifiy waiting thread they can pop/push now
        waitCondition.notify_one();
    }
    

    The first important thing to mention is that I'm passing a reference of the object to be inserted. I could pass a copy, but then, since the elements of the queue are declared as T, it will double-copy when calling the push function: one when being called, one when inserting the element. Thus, passing a reference avoids unnecessary copies. First thing we do is locking the mutex, so only one thread can execute the following code. Once locked, we push the element in the queue, unlock the mutex and notify one thread that it can unfroze from waiting and consume an element from the queue. The reason unlocking the mutex before notifying is that if we notify before unlocking, the notified thread will immediately block again since the mutex is still locked, so is better to do it afterwards.

    empty

    Nothing special about this method, besides the fact that we need to protect the access to the std::queue.empty() method, in order to avoid race conditions.

    template<class T>
    bool SynchronizedQueue<T>::empty()
    {
        //try to lock the mutex
        boost::mutex::scoped_lock lock(io_mutex_);
        return SynchronizedQueue::sQueue.empty();
    }
    

    pop

    Now the funny part, this method returns true/false depending whether successful extraction from the queue was possible. The objective of this is that, if you don't want consumer threads to keep waiting blocked while the queue is empty, then use this method. It will try to pop the element and copy it in the passed reference and return true, if unsuccessful it will return false, but the thread will be able to do something else instead of blocking itself waiting.

    template<class T>
    bool SynchronizedQueue<T>::pop(T& element)
    {
    
         //try to lock the mutex
         boost::mutex::scoped_lock lock(io_mutex_);
    
         //ask if the queue is empty
         if (SynchronizedQueue::sQueue.empty())
         {
     return false;
         }
    
         //get the last inserted element
         element = SynchronizedQueue::sQueue.front();
         //remove the last inserted element, since we have a copy in element
         SynchronizedQueue::sQueue.pop();
    
         //no need to unlock the mutex, it will get unlocked when the function ends
         return true;
    
    };
    

    Once again, first thing to do is lock the mutex, then check if the queue is empty. If true then return false, otherwise make a copy of the last inserted element in the queue (front) and then remove it by calling std::queue.pop() method. In this case there is no need to call for unlocking the mutex, since there are no waiting threads blocked waiting for elements being popped, and the lock will automatically unlock when it reaches the end of the method.

    waitAndPop

    This method is very similar to the previous one, albeit the main difference is that it will make threads block waiting as long as the queue is empty.

    template<class T>
    void SynchronizedQueue<T>::waitAndPop(T& element)
    {
    
        //try to lock the mutex
        boost::mutex::scoped_lock lock(io_mutex_);
        //while the queue is empty, make the thread that runs this wait
        while(SynchronizedQueue::sQueue.empty())
        {
     waitCondition.wait(lock);
        }
    
        //when the condition variable is unlocked, popped the element
        element = SynchronizedQueue::sQueue.front();
    
        //pop the element
        sQueue.pop();
    
    };
    

    As you can see, right after locking the mutex, we check if the queue is empty. If so, we call the wait method. This effectively makes the thread wait until other thread calls the notify_one or notify_all methods. If you can recall, this was done in the push method after inserting an element. When a blocking thread is notified it will check once again if the queue is empty. Since its not, it can continue. The rest is just like in the pop method.

    sizeOfQueue

    Self-explanatory method, just returns current queue size.

    template<class T>
    unsigned int SynchronizedQueue<T>::sizeOfQueue()
    {
     //try to lock the mutex
     boost::mutex::scoped_lock lock(io_mutex_);
     return SynchronizedQueue::sQueue.size();
    
    };
    

    toString

    I added this method for your convenience, it tries to get a string representation of the queue contents. Since the elements in the queue can be mostly *anything*, the only reasonable output would be the name of the class. To do this boost::any is used.

    template<class T>
    std::string SynchronizedQueue<T>::toString()
    {
        std::stringstream os;
    
        //make a copy of the class queue, so we dont care about mangling with threads
        std::queue<T> copy = SynchronizedQueue::sQueue;
    
        //check the queue is not empty
        if (!copy.empty())
        {
         int counter = 0;
    
         os << "Elements in the Synchronized queue are as follows:" << std::endl;
         os << "**************************************************" << std::endl;
    
         while (!copy.empty())
     {
          //get the first element in the queue
      boost::any object = copy.front();
      std::string b = "Element at position " + boost::lexical_cast<std::string>(counter) + " is: ";
      b.append("[");
      b.append(object.type().name());
      b.append("]\n");
          os << b;
          //remove the element in the queue
          copy.pop();
     }
         return os.str();
        }
        else
        {
         os << "Queue is empty";
         return os.str();
        }
    }
    

    First thing to notice is that I make a copy of the queue before iterating over the elements. I did this so there was no interference with producer and consumer threads, although I'm not sure if the copy process itself could cause some problems. I recon it should be protected with the mutex but heh, I just made this method for debugging and testing purposes, it shouldn't be used in a production environment. Probably you also realized that I'm not using iterators to loop over the queue elements, mostly because queues are not meant to iterate. Thus, I made a simple front-pop as long as there are elements approach. Probably not optimal at all, but as I said this is only for testing purposes.

    You can download the source from here.

    Conclusion

    Most surely there are better solutions which are already implemented and well tested better than this, but I liked the process of making this as my first step in the thrilling world of multi-threading programming. I tried this and I can tell you that it seems to work nice after some testing. If you want your threads to wait when reading, use waitAndPop, if not use pop, that easy. Besides, you can apply this approach to any other STL container you can think of.

    Stay tuned for the next article on this subject in which a test program for this queue is explained.

    References

    1 comment:

    1. Hi,

      I was looking for a safe thread queue like this and you post and code looks like what i was looking for. Thank you for sharing the logic and the code. Did you ever had a chance to publish a test program for using this queue ?

      Thanks

      Jeff

      ReplyDelete