Distributed Transactions in FoundationDB

FoundationDB is an open-source database that provides distributed ACID transactions. In this post, we will see how FoundationDB uses optimistic and multi-version concurrency control techniques to provide strictly serializable distributed transactions. 

Before we start, I suggest that you study the official resources by FoundationDB that I have cited at the end. This post is complementary to those resources and tries to answer questions that might arise reading them. 


Figure 1 shows the overall architecture of FoundationDB. We can categorize components into stateful and stateless components. 
Figure 1. The architecture of FoundationDB [1]. The blue boxes are stateful and the green boxes are stateless components.

Stateful Components
  • Coordinators: The job of coordinators is to elect a cluster controller by running a consensus algorithm. The first version of FoundationDB used Zookeeper but later it changed. 
  • Transaction Logs: The transaction logs form a distributed Write-Ahead Log (WAL). Each transaction log is replicated for higher availability. In this post, we don't consider a sharded transaction log. Thus, we consider the case where we have only one transaction log that it is replicated. 
  • Storage Servers: Each storage server is an individual key-value store hosting a part of the data. Each storage server is replicated. Storage servers directly serve the read requests. Each storage server is tied to only one of the replicas of the transaction log and pulls updates from it.  

Stateless Components

  • Cluster Controller: Cluster control is picked by the coordinators. All components register themselves to the cluster controller. 
  • Master: The job of the master is assigning commit timestamps to the transactions. There is only one instance of the master. The master is picked by the cluster controller.  
  • Resolvers: Resolvers are used to perform conflict checking on transaction and vote to commit/abort transactions. Resolvers together maintain a cache of all versions written in the past 5 seconds. The resolvers are sharded by the key range, i.e. each resolver maintains the versions written for a particular range of the keys.
  • Proxies: As the name suggests, proxies are the agent of the client to communicate with the rest of the system. A proxy gets commit timestamp from the Master, performs conflict checking via resolvers, and appends committed transactions to the transaction logs. 

Read Path

As said above, reads are directly served by the storage servers. Thus, when a client wants to read a version, it directly asks the storage server hosting the key. If the client does not know which server it has to go to, it asks one of the proxies. Before a transaction can read, it first gets a read version from the database. We will explain below how this version is assigned to the transaction. All reads in the context of a transaction use this read version. A storage server maintains a version chain for each key that it hosts. When a client wants to read a key at a particular read version t, the server returns the most recent value with a version less than or equal to t. 

How can we be sure that a replica is up-to-date enough? Maybe there is a more recent value with version less than t, but the replica has not received it yet. 
This is very similar to snapshot timestamp in Spanner as we saw in this post. In Spanner, we have t_safe to make sure a replica is updated enough and has all values with versions less than t_safe. Most likely FoundationDB is using a similar approach. 

Write Path

Writes are simply buffered on the client. FoundationDB uses the optimistic concurrency control. Thus, there is no 2PL and no lock is acquired for the items that a transaction touches. At the commit time, the client submits its read and write sets to one of the proxies. The read set includes all keys read in the context of the transaction together with their versions, and the write set is the set of key-value pairs written in the context of the transaction. Once the proxy got the commit request from the client, it asks the master to assign a commit timestamp to the transaction. The master is the only singleton in the system. However, it never becomes a scalability bottleneck for two reasons: 
  1. Its job is very simple, i.e., just assigning commit timestamp to the transactions. 
  2. If there are lots of transactions waiting for the commit timestamp, the proxy batches them all and sends a single request to the master. 
After getting the commit timestamp, the proxy asks the resolvers to decide whether it should commit the transaction or not. Each resolver checks the read-set of the transaction. If the resolver finds out that there is a value committed with a version greater than the version of the value read by the transaction, it votes to abort the transaction. The proxy gets all the responses from the resolvers. If all of them of said Yes, the proxy decides to commit the transaction. Otherwise, it aborts the transaction. 

Can we say the proxy basically runs a 2PC? 
It is not 2PC, as we don't have 2 phases. Once the proxy gets the responses from the resolvers it does not sends the second phase of 2PC to let them know about the decision. 

But we said the resolver keeps track of all versions committed in the last 5 seconds. How can a resolver do that without knowing if the transaction finally committed or not? 
When a resolver votes Yes to a transaction, it assumes the transaction will commit, so it goes ahead and updates its cache. Indeed, it might not be the case and the transaction may abort due to a No from another resolver. However, assuming an aborted transaction as committed by a resolver does not violate the correctness. It only might abort future transactions due to this false belief. Aborting a transaction can never violate the correctness of serializability. However, it might be bad from the performance point of view.  

As you said above, running a sort of incomplete 2PC between a proxy and resolvers may abort transactions that should not be aborted in the future which is not desired. What is the benefit of avoiding 2PC here? 
Note that anything we put to the cache of a resolver will last only for 5 seconds. After 5 seconds, it seems as if we didn't add it in the first place. In that short time, if we really want to avoid any false abortion, we have to implement a locking mechanism on the resolvers to keep future inflight transactions pending until we learn the final decision for an earlier transaction. This locking comes with its overhead and FoundationDB avoids that. FoundationDB's philosophy, like other optimistic systems, is to provide high performance for low contention workloads. 

After the proxy decides to commit a transaction, it appends the write set of the transaction to the transaction logs. Note that, for a single transaction, the proxy might need to talk to several transaction logs, as the transaction logs may be sharded. However, in this post, assume we always have only one shard for the transaction log and it is replicated for higher availability. The FoundationDB does not use consensus algorithms such as Paxos or Raft for replicating the transaction logs. Instead, it simply requires to synchronously write to ALL replicas of a transaction log. 

But then how can we handle replica failures? If one of the replicas goes down, the system will be unavailable? 
In the case of a replica failure, FoundationDB starts a new replica to keep the number of healthy replicas to the desired replication factor. We will talk about fault-tolerance later in this post. 

Note that like all WALs, the data on transaction logs must be durable. Thus, a transaction logs fsync the data to the disk before returning to the proxy. After the proxy appended the updates to the transaction logs, it returns success to the client. Later, the storage servers pull mutations from the transaction logs and apply them. 

Get Read Version

As we said above, a client gets a read version from FoundationDB for each transaction and includes it in every read request it sends to storage servers. The storage servers then make sure not to return any version with a timestamp higher than the read version. Now let's see how FoundationDB provides this read version. 

To get the read version, the client asks one of the proxies. The proxy, asks other proxies the highest commit timestamp they have assigned to a transaction. The proxy then returns the maximum timestamp it received from other proxies and its own highest commit timestamp and returns it to the client. 

So to get the read version for each transaction, the proxy has to talk to other proxies? Isn't it a scalability issue? 
Like getting the commit timestamp from the master, here also we can solve the scalability issue by batching the transactions. So the proxy can batch many read version requests, ask other proxies and calculate the max timestamp, and send it to all clients as the read version. All of them will receive the same read version. 

What is the purpose of using the maximum of all previously committed timestamps as the read version? 
As we explained in this post, to guarantee strict serializability, we need to make sure that when T1 starts after T2 commits, T1 can see the effect of T1. Since FoundationDB does not make values with timestamps greater than the transaction read version visible, we need to make sure the read version is greater than the commit timestamp assigned to any previously committed transaction. 

But suppose proxy A asks proxy B its highest committed timestamp, but after that and before proxy A return read version to the client, another commit occurs on proxy B. Now the read version is not greater than all previously committed transactions. 
The read version has to be larger than all committed versions before the transaction starts. The new commit to proxy B that you described above occurs after the original transaction started. Thus, the read version can be smaller than the new committed version on proxy B. 

Strick Serializability

FoundationDB guarantees the strict serializability by the following invariants:
  1. The writes of transactions are separated by their commit versions and no two transactions have the same commit version. 
  2. All reads of a transaction happen at a single point in time determined by the read version. Thus, newer versions are ignored, and a transaction is guaranteed to read the most recent versions committed before the given time. 
  3. Commit timestamps are monotonically increasing. 
  4. Both the read and commit timestamps of a transaction T are guaranteed to be greater than the commit timestamp of any transaction committed before T starts. 
Invariants 1 and 2 guarantee the isolation part of the strict serializability. Invariants 3 and 4 guarantee the linearizability part. 

How linearizability is guaranteed when we have multiple Proxies? 
Consider this example: Suppose, we have two proxies P1 and P2. We have two transactions T1 and T2 both writing the same key. T1 goes to P1 and T2 goes to P2. P1 gets commit timestamp 1 from the master, and P2 gets timestamp 2 from the master. Since P1 is slow, P2 goes ahead and commits its transaction with timestamp 2. Then, P1 commits its transaction.
Now, we have this: T1 has committed next, but its timestamp is less than that of T2, i.e. commit timestamps do not respect the real-time order. Now, if we read the value of the key, we read the value return by the T2, although T1 is the last transaction that is committed. Doesn't it violate linearizability? 
There is nothing wrong with the scenario above. Note that linearizability requires the writes to appear at a single point in time between the start and the end of the operation. Thus, we are allowed to put that point anywhere between the time the client sends its request and the time system returns the result. Thus, in this example, although T1 commits next, a linearizable system is allowed to put the actual point in time when T1 writes its mutation before that of T2. This situation happens because T1 and T2 are actually concurrent transactions, and a linearizable system is allowed to order them anyway it likes. 

Figure 2. For T1 and T2 on the left, we can assume the actual point in time when the mutations of T1 take place (A) is earlier than that of T2 (B). Thus, although T1 commits after T2,  a linearizable store is allowed to order T2 after T1. On the other hand, for T1 and T2 on the right, there is no way that we can put A and B such that A < B. Thus, a linearizable system must order T2 before T1.

In other words, linearizability concerns about non-concurrent operations--operations whose request-to-return interval do not overlap with each other. If T1 starts while T2 has committed and returned to the client, there is no way that we can justify writes by T2 appearing to take place after T1. In that case, the system must make sure that the timestamp of T1 is greater than T2 which FoundaitonDB guarantees that. 


Each storage server is replicated. Thus, when a storage server dies, the client can go to the other replicas of that shard to read. Thus, handling the crash of a storage server is straightforward. In this section, we focus on the crash and recovery for components in the transaction subsystem. The transaction subsystem includes the following components: master, proxies, resolvers, and transaction logs. The master continuously checks the health of proxies, resolvers, and transaction logs. If any of them crashes, the master terminates itself. Once the cluster controller finds out that the master is crashed/terminated, it picks a new master. The new master recruits a new instance for every component in the transaction subsystem and starts the recovery process. Note that a single failure in any component in the transaction subsystem causes all other components in the transaction subsystem to be replaced with new processes. Also, the failure of the cluster controller itself will cause the coordinators to pick a new cluster controller and start the recovery process. Thus, FoundationDB uses a unified recovery process for a failure in any of these components. Now, let's see what is this recovery process. 

Recovery Process

The new master replaces all components in the transaction subsystem with new instances. Thus, we will have new proxies, new resolvers, and new transaction logs. During the recovery, the system does not accept new requests. Before resuming the traffic, the new master first figures out at what timestamp it must consider itself taking the ownership and claiming as the new master. This timestamp is called the recovery version. The requirement for the recovery version is not to be less than the timestamp of any previously committed transaction. Let's see how the master finds the recovery version. 

The new master first asks the coordinators where are the previous transaction logs. It then asks each transaction log the following information: 
  1. Known Committed Version (KCV): KCV is the highest version that a proxy has appended to all replicas of the transaction log. For example, when KCV is 100, we know that any transaction with a commit timestamp less than or equal to 100 is appended to all replicas of the transaction log. A transaction log learns this value periodically from the proxies. 
  2. Durable Version (DV): It is the maximum timestamp that is appended to a transaction log. 
When we have multiple proxies, how does a transaction log computes the KCV? 
This is not covered in FoundationDB official documents, but logically, I think the transaction log should use the minimum of KCVs that it received from all proxies as its own KCV, because as we said, a transaction log must be certain that the mutations with versions less than the KCV are appended to all replicas of a transaction log. 

Upon a request from the master for KCV and DV, the transaction log locks itself, so it won't accept any new transaction. After getting responses from transaction logs, the master picks the minimum of all returned DVs as its recovery version. Let's see why: As we said earlier, FoundationDB requires the quorum size N for appending to transaction logs (where N is the transaction log replication factor). Thus, all replicas must be aware of the latest committed transaction. Thus, by taking the minimum of the DVs, we can be sure that the selected timestamp is not less than the timestamp of any previously committed transaction. Any future timestamp given by the master will have a value greater than the chosen recovery version. Figure 3 shows an example. The transaction log 1 is crashed and is not responding. The master receives 110 and 120 from the remaining replicas and takes 110 as the recovery version. 

Figure 3. The new master picks the minimum of DVs as the recovery version. It also picks the maximum of  KCVs as the last epoch end version. Any transaction with version greater than this value and less than or equal to the recovery version will be copied from the old transaction log to the new transaction logs. 

Now, let's see what happens to the storage servers with this transition of the transaction subsystem. Before the recovery, the storage servers were pulling writes from transaction logs and applying them. Thus, in the example above, a storage server may apply transaction 120 before the failure happens. This is not good, as according to the recovery version, the ownership of the subsystem has changed to the new master at time 110. Thus, any data with a version greater than the recovery version written by the previous generation of the transaction subsystem and applied by storage servers must be rolled back. Thus, in the example above, the storage server will rollback the mutations by transaction 120. 

What if a transaction T has read a version written by transaction 120 in the example above before we rollback? 
To read a version written by transaction 120, transaction T must have received a read version greater than or equal to120. A transaction cannot commit unless the proxy has appended its mutation to all of the transaction logs. Thus, if T managed to get a read version >= 120, then all DVs must be at least 120 which is not the case. Thus, there could not be such a transaction. 

After finding out the recovery version, the new transaction system is ready to accept new transactions, but before that, FoundationDB artificially increases the time by 90 seconds causing all inflight transactions to abort due to "transaction too old" error [3]. Note that even read-only transactions require to commit before returning to the clients. Thus, inflight read-only transactions will also abort. The storage servers continue reading from the old transaction logs. After pulling all mutations of an old transaction log, a storage server switches to the new transaction logs. 

There are cases where due to a failure, FoundationDB returns result_unknown for transactions that are in the final stage of committing, i.e. appending to transaction logs. In Figure 3, a proxy appended transaction 100 to all three replicas but suppose the proxy crashes before returning success to the client. In this example, transaction 100 will be applied to the storage servers as the recovery version is 110. Now, consider the same situation for transaction 120; suppose a proxy appends 120 to transaction log 3 and then crashes. Unlike transaction 100, transaction 120 will aborts. From the client-side, these situations are the same--the proxy crashes before returning the result, but one transaction will commit and apply on storage servers, and the other will abort and rollback. In this situation (and other cases like this) the client will receive result_unknown.

But what the client is supposed to do with this "result_unknown"? Suppose this is an important transaction such as payment; the application must know the payment was finally successful or not?
The client has to retry its transaction. To satisfy the application-level consistency (e.g. to avoid redundant payments), the application developers have to make sure their transactions are idempotent, i.e., applying the same transaction more than one time has the same effect as applying it one time. One way is to assign each transaction a Global Transaction Identifier (GTID) and deduplicate mutations with the same GTID. Note that even though FoundationDB may return result_unknow for some transactions, that does not mean it relaxes on providing ACID guarantees for those transactions. In other words, although the clients do not know the result, those transactions will still be either atomically applied or discarded, and if committed, they respect strict serializability. 

In Figure 3, transaction logs 1 and 2 return 90 and 95 as their KCVs. Retuning 95 by transaction log 2 means all transactions with timestamps less than or equal to 95 are appended to all replicas. However, for transactions 96 up to 110, we don't have information to know if they are appended to transaction log 1 or not, because the transaction log is crashed and is not responding. For transactions in the range of [0-95] we know we have 3 (required replication factor) copies. All future transactions will be appended to all 3 new transaction logs, so will have 3 copies of them. However, for transactions [96-110], we are not sure if have the required number of copies which is 3 in this example. To make sure that we satisfy the required replication factor, the master copies all transactions in the range of [max(KCVs)+1, recovery version] to the new transaction logs. In this example, it copies [06-110] to the new transactions logs. This way, we are sure that for all transactions we have at least 3 copies in our transaction logs. 

In summary, 

  • FoundationDB uses MVCC to provide isolated snapshots to read, and by making sure the snapshot time is always greater than the timestamp of any previously committed version it guarantees linearizability. 
  • To replicate transaction logs, FoundationDB does not use Paxos or Raft. Instead, it requires a quorum size equal to the replication factor. In the case of replica failure, it creates a new replica to have the required quorum size. 
  • All failures of the transaction subsystem are handled uniformly by a single recovery process. 


[1] Evan Tschannen, "Technical Overview of FoundationDB", https://www.youtube.com/watch?v=EMwhsGsxfPU&feature=youtu.be


XiaoK said…
Q1:“ I think the transaction log should use the minimum of KCVs that it received from all proxies as its own KCV,” If one proxy has no update for a long time,KCV can't advance any more?
Q2:In the paper,2.4.4,"the Sequencer knows the previous epoch has committed transactions up to the maximum of all KCVs"。why maximum?

Popular posts from this blog

Graph Databases

In-memory vs. On-disk Databases

Amazon DynamoDB: ACID Transactions using Timestamp Ordering

Skip List Data Structure