Dual Data Structures

Dual data structures are concurrent data structures that not only hold data, but also keep track of read requests using reservations. By holding both data and anti-data, dual data structures achieve better performance and fairness compared with other blocking and non-blocking concurrent data structures.

In multi-threaded programs, threads typically communicate with each other by calling the API of a shared data structure. It is challenging to keep a data structure in a sound state when using it concurrently by several threads. There are many blocking and non-blocking solutions for this problem. In this post, we want to see how dual data structures deal with this problem.

Before moving forward; with powerful hardware available today, do we need parallelism? Can't we just stick to a single thread and make our life easy?
The hardware improvement has significantly slowed down since 2004 (see chart below from [1]). To keep improving the processing power, the manufacturers have kept adding more cores since then. So if the processing power of two decades ago is enough for you, and you are OK wasting resources that are available to you today via multiple cores, yea, you can avoid parallel processing. However, to fully utilize your modern hardware, you have no choice but to embrace parallelism.

Figure 1. Performance = highest SPECInt by year [1]

Data Structures and Concurrency

This is what we ideally want when dealing with concurrency:
  • Let threads concurrently access the data structure
  • Provide an illusion of sequential access
Let's see how concurrent updates may violate the illusion of sequential access and break a queue data structure. Consider the queue with a single entry A in it. Thread 1 wants to dequeue and Thread 2 wants to enqueue B. They both find the queue in the same initial state and proceed with their dequeue/enqueue logic. 

Thread 1 {
    sees head and tail are A
    sets head and tail to NULL
}

Thread 2 {
    sees tail is A  
    sets A.next to B
    sets tail to B
}

Since we have two operations, only two serial executions are possible: 1) Thread1.dequeue, Thread2.enqueue, or 2) Thread2.enqueue, Thread1.dequeue. Both cases result in a queue with only B in it. 

Sketch 1. A queue with a single entry and our desired end state after a dequeue and an enqueue.

However, in a concurrent execution, the following interleaving may happen.
Thread 1: sees head and tail are A
Thread 2: sees tail is A
Thread 2: sets A.next to B
Thread 2: sets tail to B
Thread 1: sets head and tail to NULL

After the above execution, the final state of the queue is empty, which does not conform to any serial execution.
Sketch 2. A possible incorrect end state due to concurrently executing an enqueue and a dequeue.

Synchronization

To solve the above problem, one way is to guarantee atomicity for the operations using locks. Specifically, each thread first obtains an exclusive lock on the data structure, performs its operation, and finally releases the lock. We can do that using synchronize blocks or functions in Java. This approach does not perform well, especially if the thread holding the lock is preempted by the OS scheduler before finishing the operation and releasing the lock. 

Non-blocking Queue

There are many non-blocking data structures that avoid locks while letting multiple threads concurrently execute their operations on the data structure. Typically these data structures use some synchronization primitives supported by hardware such as Compare-And-Swap (CAS) operation. A non-blocking queue (known as M&S queue) using CAS is presented in [2]. In this data structure, we have a dummy node that is defined to be the node whose next pointer points to the first actual node inserted to the queue. The head pointer of the queue always points to the dummy node. Initially both head and tail pointers point to the dummy node. After enqueuing to the queue, the head pointer still points to the dummy node, and the tail pointer points to the newest item added to the queue.

Sketch 3. Structure of a M&S non-blocking queue

To enqueue, we basically wants to do this:

  1. Set the current tail->next to the new node
  2. Set tail to the new node. 
However, to make sure while we are doing this, no other thread changes our queue, we do the above operations with CAS. Basically, the idea is: 
  • We set the tail->next to the new node only if tail->next is NULL. If another thread managed to enqueue before this thread, the CAS operation returns failure and we have to repeat.
  • We also set the tail of the queue to the new node, only if the tail has not changed. For example, if after setting the next pointer of the former tail, and right before updating the tail, another thread enqueued another node and updated the tail, the CAS fails and the first thread does not update the tail anymore.
In addition to these, there is a subtle point: Suppose an enqueuer thread gets suspended right after updating the next pointer of the former tail and before updating the tail pointer. Now, another thread wants to enqueue. In this situation, we don't want the second thread to be blocked. Thus, the second thread goes ahead and fix the tail and retries. Later, when the first thread wakes up, its CAS to update the tail will fail, because the tail is changed by the second thread. This case is highlighted in the pseudocode below. Note that we don't need such cleanup when using locks, because a thread releases the lock only after everything including updating the tail is done. Thus, no other thread may find a wrong tail.

    boolean CAS(variable_to_bet_set, expected_value, new_value)

    enqueue(value) {
        node = new node()
        node->value = value
        node->next = NULL
        while (true) {
            old_tail = tail        
            old_next = old_tail->next
            if (tail == old_tail) {
                if (old_next == NULL) {
                    // if CAS fails, it means another thread has enqueued, we have to try again
                    if (CAS(old_tail->next, NULL, node)) (A)
                        break
                } else {
                    //tail must be fixed
                    CAS(tail, old_tail, old_next)
                }
            }
        }
        //we update the tail if tail is still the one we saw. Otherwise, no action is needed    
        CAS(tail, old_tail, node) (B)
    }
    Sketch 4.  Enqueueing a non-blocking queue. To update the next pointer of the former tail and updating the tail itself, we use CAS.

    To dequeue, we return FALSE if queue is empty (head == tail), otherwise:

    1. Set the returned value to head->next (Note that head is dummy node.)
    2. Set head to the head->next.
    and return TRUE. 

    To make sure another thread does not dequeue before we update the head, we use CAS. Similar to the enqueue, we might need to fix the tail when we find queue empty (head = tail), but head->next is not NULL. In this case, we fix the tail by setting tail to head->next using CAS. We can free the memory right before returning TRUE in the dequeue function. Note that it is critical to read the value of the old_next before CAS (highlighted in read below). Otherwise, the old_next might be freed by another thread before we read its value. 

    deueue(returned_value) {
        while (true) {
            old_head = head //dummy node
            old_tail = tail
            old_next = head->next //the actual front of the queue
            if (head == old_head){
                if (head == tail) {
                    if (old_next == NULL) {
                        //queue is empty and no clean-up needed.
                        return FALSE
                    }
                    //queue is NOT really empty and the tail must be fixed
                    CAS(tail, old_tail, old_next)
                } else {
                    //critical to read value of old_next before the updating the head
                    returned_value = old_next->value
                    //if CAS fails, it means another thread has dequeued a new entry before this thread,
                    //we have to try again
                    if (CAS(head, old_head, old_next)) (A)
                        break
                }
            }
        }
        free(old_head)
        return TRUE
    }
    Sketch 5. Dequeuing a non-blocking queue. We update the head pointer with CAS. The former head is the new dummy node pointed by the head pointer.

    Dequeuing an Empty Queue

    No matter whether a data structure uses locks or it is lock-free, when a thread wants to read from the data structure and it does not have any data ready, the thread must wait. A typical situation that this might happens is when we have a pool of worker threads taking jobs from a queue. The worker threads will be blocked when the queue is empty. 

    Let's see how we can deal with lack of data. When we use locks, we can put a reader thread to sleep and wake it up when data is ready. However, we cannot do that with a non-blocking data queue, as we don't use any lock. Thus, a common approach is to use a busy-wait loop.

    do {     t = q.dequeue() } while (t == NULL)
    // process t
    But this approach has two problems:
    • It introduces unnecessary contention for memory and communication bandwidth that causes performance degradation
    • It does not provide fairness; suppose Thread 1 and Thread 2 both wants to dequeue and Thread 2 calls dequeue after Thread 1. With this approach, there is no guarantee that Thread 1 will gets its data before Thread 2.

    Dual Data Structures

    Dual data structures are invented to address issues mentioned above when dealing with lack of data. They are called "dual" because, a dual data structure holds both data and anti-data in terms of reservations. 

    Dual queue [3] is dual data structure based on the non-blocking queue [2]. Specifically, as long as the number of calls to dequeue does not exceed the number of calls to enqueue, a dual queue behaves as a non-blocking queue. In a dual queue, in addition to data nodes, we may have reservation nodes. Similar to a non-blocking queue, a dual queue has a dummy node. Thus, the first data/reservation node is always the second node in the queue.

    Sketch 6. The idea of dual data structures is to hold both data and anti-data

    The reservation nodes are used to track the dequeue attempts occurred when queue does not have any data available (case (A) below). Specifically, when queue is empty or contains only reservations, the dequeue method adds a reservation node to the queue and then spins on the request pointer of the former tail, i.e., it keeps checking it until it is not NULL (highlighted in blue below). Otherwise, the dequeue returns the next of the head and updates the head (case (B) below).

    dequeue() {
        node = new node() //it is a reservation
        node->request = NULL
        node->isRequest = true
        while (true) {
            old_tail = tail        
            old_head = head
            if (old_tail == old_head || tail.isRequest) { (A)
                //In this case, (possibly) either queue is empty or no data is available.
                //We like to enqueue a reservation, but we need further checks before that.
                old_next = tail->next
                if (tail == old_tail) {
                    if (old_next != NULL) {
                        //The tail must be fixed
                        CAS(tail, old_tail, old_next)
                    } else if (CAS(tail->next, next, node)) {
                        //Either queue was empty or no data exists, so we went ahead and
                        //succeeded in enqueuing the new reservation using CAS. Now, we try to update the tail
                        CAS(tail, old_tail, node)
                        //fixing the head if necessary
                        if (old_head == head && head->request != NULL) {
                            CAS(head, old_head, head->next)
                        }
                        //Note that we spin on the OLD tail
                        while (old_tail->request == NULL)
                        old_head = head
                        if (old_head = old_tail) {
                            CAS(head, old_head, node)
                        }
                        result = old_tail->request->value
                        free(old_tail->request)
                        free(old_tail)
                        return result
                   }
               }      
            } else { (B)
                //In this case, (possibly) we have a data available.
                //So we try to dequeue it instead on adding a new reservation
                old_next = old_head->next
                if (old_tail == tail) {
                   result = old_next->vale //note that the head is dummy
                   if (CAS(head, old_head, old_next) {
                        free(head)
                        free(node)
                        return result
                   }
                }
            }
        }
    }

    The enqueue method adds a new data node to the queue if queue is empty or contains only data nodes (case (A) below). Otherwise, it fulfills the request at the head of the queue instead of adding a new data node to the queue (case (B) below). To fulfill a pending request, the enqueue method CAS the request pointer of the head to a node outside the queue (highlighted in blue below). Setting the request will break the spin of the dequeuer thread. 

    enqueue(value) {
        node = new node()
        node->value = value
        node->next = NULL
        node->request = NULL
        node->isRequest = false
        while (true) {
            old_tail = tail        
            old_head = head
            if (old_tail == old_head || !tail.isRequest) { (A)
                //In this case, (possibly) either queue is empty or no request is waiting
                //We like to enqueue new data node, but we need further checks before that.
                old_next = tail->next
                if (tail == old_tail) {
                    if (old_next != NULL) {
                        //The tail must be fixed
                        CAS(tail, old_tail, old_next)
                    } else if (CAS(tail->next, next, node)) {
                        //Either queue was empty or no request exists, so we went ahead and
                        //succeeded in enqueuing the new node using CAS. Now, we try to update the tail
                        CAS(tail, old_tail, node)
                        //no matter CAS is successful or not we return here
                        return
                   }
               }      
            } else { (B)
                //In this case, (possibly) we have a waiting request.
                //So we try to satisfy that instead of enqueuing a new node.
                old_next = old_head->next
                if (old_tail = tail) {
                    req = old_head->request
                    if (head == old_head) {
                        //If head request (req) is NULL, i.e., it is not yet satisfied, we set it to the new node.
                        success = (!req && CAS(old_head->request, req, node))
                        //Pop the head no matter we consumed the new node or not.
                        //In case we don't consume the new node, here we are basically doing cleanup left by another thread
                        CASE(head, old_head, old_next)
                        //We returned if we actually satisfied a request and consumed the new node.
                        //Otherwise we repeat the loop
                        if (success) return
                    }
                }
            }
        }
    }

    In the dequeue algorithm explained above, we still have the busy loop similar to one we used with non-blocking queue. Doesn't that cause performance issue as it does for non-blocking queue?
    No, because there is am important difference between spinning in the the dequeue method of the dual queue and spinning of the non-blocking queue; in the dual queue the dequeue thread and the enqueuer are the only threads that access the request pointer of the node. Note that the enqueuer thread accesses it only one time to set it and breaking the spin. Thus, there is virtually no contention over the pointer. However, with non-blocking queue multiple threads all wants to read the head of the queue. This contention causes cache misses leading to performance degradation.

    The above description of the dual queue is based on what presented in the [3] and the pseudocode available here. However, it seems to me the description in [3] and its companion pseudocode are slightly different from what Michael Scott describes in this talk [5].

    Java Implementations

    ConcurrentLinkedQueue is a non-blocking queue based on the non-blocking queue explained above from [2]. 


    SynchronousQueue is the synchronous version of the dual queue. With a normal dual queue explained above, only a dequeue may be blocked due to lack of available data. With a synchronous queue, on the other hand, an enqueue may also be blocked due to lack of a request. Thus, with a SynchronousQueue, producers and consumers pair up to synchronously produce and consume data.


    Exchanger is another dual data structure implemented in Java standard library based on [4]. With exchanger, there is no distinction between producer and consumer. Instead, any thread is both producer and consumer, so threads pair up to exchange data with each other.


    Summary

    Dual data structures are interesting data structures that by holding data and anti-data, provide better fairness and performance compared with other concurrent data structures. We focused on queues in this post, but dual version of other non-blocking data structures are also designed. If you like to learn more you can check out the original papers cited in this post, or check the source code of the Java implementations. 

    References

    [2] Maged M. Michael, and Michael L. Scott. "Simple, fast, and practical non-blocking and blocking concurrent queue algorithms." In Proceedings of the fifteenth annual ACM symposium on Principles of distributed computing, pp. 267-275. 1996.

    [3]  William N. Scherer, and Michael L. Scott. "Nonblocking concurrent data structures with condition synchronization." In International Symposium on Distributed Computing, pp. 174-187. Springer, Berlin, Heidelberg, 2004.

    [4] William N. Scherer III, Doug Lea, and Michael L. Scott. "A Scalable Elimination-based Exchange Channel." SCOOL 05 (2005): 83.

    [5]  Michael Scott, "Dual Data Structures"(Hydra2019) https://2019.hydraconf.com/2019/talks/lnhqpq8cz5kzgjmhlqpic/

    Comments

    Popular posts from this blog

    In-memory vs. On-disk Databases

    ByteGraph: A Graph Database for TikTok

    DynamoDB, Ten Years Later

    Amazon DynamoDB: ACID Transactions using Timestamp Ordering