ByteGraph: A Graph Database for TikTok

ByteGraph is a distributed graph database developed by ByteDance, the company behind the popular social network TikTok. As a large social media platform, ByteDance handles an enormous amount of graph data, and using a specialized graph database like ByteGraph is essential for efficiently managing and processing this data. In 2022, ByteDance published a paper on ByteGraph at the VLDB conference. In this post, I will provide an overview of my understanding of ByteGraph and highlight some of the key points from the paper that I found interesting.

If you are new to graph databases and want to learn more about the benefits of using a specialized graph database, you may want to read an earlier post from this blog.

ByteGraph at a Glance

The following are the highlights from the ByteGraph [1]:
  • Designed to handle OLAP, OLSP, and OLTP workloads.
  • Manages graphs with tens of billions of vertices and trillions of edges.
  • Architecture: three-layer architecture
    • BGE: query processing (parsing, planning, execution) and coordinating 2PC for distributed transactions. BGE talks to BGS instances, each hosting a shard, to execute a query.
    • BGS: Responsible for in-memory caching and log management. 
    • Persistent Storage Layer: An off-the-shelf key-value store such as RocksDB, responsible for durable storage.
  • Storage:
    • ByteGraph native components are stateless and it relies on a key-value store as a black box for persistent storage.
    • Uses B-tree-like indexes called edge-trees. 
    • Automatically manages secondary indexes in response to the workload.
  • Query Processing:
    • Uses Gremlin as query language.
      • Graph algorithms steps (e.g., PageRank, single-source shortest paths) are not supported. ByteDance has a dedicated Pregel-like graph system for algorithms.
    • Uses centralized rule-based and cost-based optimizations.
  • Replication:
    • Uses the common scheme of single-leader replication within a region and multi-leader replication with eventual consistency for cross-region for replication. 
    • Uses Hybrid Logical Clocks (HLCs) for cross-region conflict resolution.
  • Transactions:
    • Supports read-committed transactions via 2PC and locking using write-intents without 2PL or MVCC.

Storage

Perhaps the most noticeable thing about the storage layer of ByteGraph is its reliance on a key-value store. The vertices are stored as ⟨key: ⟨vID, vType⟩, value: ⟨a list of properties⟩⟩ key-value pairs. The edges are stored as ⟨key: ⟨vID, vType, eType, dir⟩ , value: root node of an edge-tree⟩. The edge-tree is built based on the values of a property. The default property is the timestamp. ByteGraph analyzes the actual usage pattern and automatically decides to create new edge-tree indexes. When we have multiple edge-tree for a vertex, the edge key-value pairs point to a forest of edge-trees instead of a single edge-tree.  Note that in the key of the edges, we have eType. Thus, edges are grouped based on their type and we have an edge-tree or a forest of edge-trees per type. This decision is made to handle super vertices with a huge number of edges; when we group edges with type, we can reduce read amplification when executing queries. 

Nodes of an edge-tree have lower/upper bounds on their size; a node splits into two nodes when it reaches its upper bound, and two nodes merge if their sizes fall below the lower bound. For example, an edge-tree with 1000/2000 lower/upper bound, can hold 2000 ^ 3 = 8 billion edges with three layers which are more than enough to store the adjacency list even for very large graphs. By configuring lower/upper bounds, we can optimize the edge tree for the actual workload. For example, read-heavy workloads may benefit from higher upper bounds which result in reduced disk I/O.

Why does a higher upper bound reduce disk I/O?
A higher upper bound means larger nodes which means fewer nodes in the tree. Thus, it will be possible to load a larger portion of the tree to memory. Having more nodes in the memory means less disk I/O. Ideally, we would like to load the root node and all meta nodes to the memory. This way, we won't need any disk I/O to search, and we will need at most one disk I/O to read the actual data at the very end.     

Since ByteGraph uses a key-value store under the hood, you can think of ByteGraph as a layer on top of a key-value store (e.g., RocksDB) that exposes a graph API. In an earlier post, we talked about some disadvantages of using a non-graph NoSQL data store such as document/key-value stores for dealing with graph data. Using a specialized graph database that internally uses a key-value store can solve some of the problems of directly using a key-value store for graph data. For example, a graph layer on top of the key-value store can do the necessary work for keeping the database consistent, e.g., when removing a vertex, it automatically removes all edges to that vertex. However, some problem still exists. For example, to efficiently traverse relations in both directions, we need to store both directions, one in the adjacency list of source and one in the adjacency list of destination. Otherwise, traversing in one of the directions would require a full scan. That's why the direction is part of the key for edge (⟨vID, vType, eType, dir⟩). Thus, to write an edge, we have to add this edge in both edge-store of the source and destination. Similarly, when we delete the edge, we have to remove it from both edge-trees. Thus, we need to do more work, and more work means, more disk I/O, higher memory usage, lower cache hit ratio and eventually higher response time. Compare this to Neo4j for example, which treats edges as first-class citizens in its storage, so if want to add an edge, you explicitly create an edge record, instead of implicitly storing it in the value of two key-value records. The edge record has its own identity and knows its source and destination (learn more).

Another issue is index lookups. Yes, we can run 1-hop queries in O(1), but for 2-hop queries we need to look up each of the intermediate vertices, i.e., reading the key-value store again. Compare this to Neo4j for example, where once we found the initial vertex, we don't need to look up any index again to find vertices when exploring the graph from that vertex. Note that I said to find vertices, not edges; using a local index (of size O(E) where E is the size of the adjacency list of the vertex) to lookup the edges of a vertex is totally fine (I think in Neo4j, you can have such indexes). That would be similar to looking up the edge-tree in ByteGraph. The difference appears after you found desired edges and you want to find the vertices on the other side of the edge (for a two-hop query for example). In that situation, in Noe4j you have a direct pointer to those vertices, but in ByteGraph, you have to look up the key-value store again. 


Having said that, ByteGraph has been successfully used for one of the major social networks managing tens of billions of vertices and trillions of edges! As said in the last section of my earlier post, implementation approaches (e.g., to use or not use index) or asymptotic time complexity analysis may not be the most reliable way to judge the efficiency of a graph database for dealing with graph data. The paper does not compare ByteGraph and Neo4j. Although one of the strengths of ByteGraph is its distributed nature, seeing the performance of ByteGraph and Neo4j side-by-side especially for multi-hop queries on a single node would be very interesting.

Query Processing

ByteGraph uses both rule-based and cost-based query optimization. The number of edges in an edge-tree is stored in its root node. That information can be used for cost-based optimization. For example, to check the existence of an edge between two vertices, it is more efficient to scan edges of the vertex with fewer edges. 

To explain the query processing, the paper provides an example. Let's understand this example. Suppose we want to execute the following Gremlin query: 

g.V(A,user).outE(post).has(date,within($range)).in(like).has(age,lt($age)).order().by(age, asc).properties(name, age)

The query says: Given vertex A of type user,
  1. Get all post edges going out of this vertex. 
  2. Filter edges of step 1 by the data property and the given range.
  3. Then get all vertices that have a like edge going into the vertices at the other end of the filtered edges returned by step 2.
  4. Filter vertices of step 3 by the age property and the given range.
  5. Order the vertices of step 4 by age. 
  6. Retrieve the value of name and age properties of vertices of step 5.
So basically it says: give me the name and age of all persons within a certain age range that have liked posts published by this user within a certain date range. That is an example of a two-hop query; first a post edge and then a like edge. Note that the direction of the edges is reversed; the first edge is an outgoing edge, but the second edge is an ingoing edge. Sketch 1 shows an example execution of this query. 

Sketch 1. An example of executing this Gremlin query: g.V(A,user).outE(post).has(date,lt(200)).in(like).has(age,gt(30)).order().by(age,asc).properties(name,age)

This is how the query is processed:
  • First, a random BGE receives the query. The BGE parses the query and identifies the BGS instance that manages vertex A.
  • First RPC: Getting the first hop (intermediate) vertices
    • BGE sends a request to the BGS instance that manages vertex A to get the post edges going out of A. To save RPC, BGE pushes down the predicate condition on the age property. Thus, the filtering of the edges is done on the BGS instances instead of BGE. 
  • Second RPC: Getting the second hop vertices
    • BGE identifies BGS instances managing the first hop vertices and sends requests to them to look up, this time, like edges. 
    • BGS instances find like edges to the first hop vertices, thereby finding the second hop edges, and return those to the BGE.
  • Third RPC: Filtering and reading second hop vertices
    • To filter these vertices and actually read the desired data (name and age), the BGE sends another round of requests to the BGS instance managing these vertices. 
    • BGS instances access the vertices, filter them by the age property and retrieve the name and age and return the result to the BGE.
  • BGE aggregates the results by ordering them and returns it to the client
Why in the second RPC, BGE does not push down the age predicate, like in the first RPC?
The first predicate is on an edge property (date), and the BGS managing vertex A has edge properties in the edge tree. On the other hand, the second predicate is on a vertex property (age). The BGS instances can find edges and the ID of the vertices on the other side of these edges, but don’t have access to their properties, so they cannot filter vertices. Thus, they return everything to BGE, and BGE pushes down the predicate in the third RPC to the final BGS instances. 

So to run this two-hop query, ByteGraph needs at least three RPCs. I think that is the minimum number of RPCs possible and there is no way around that. The number of round-trips may also be larger with the eager mode where the BGE does not wait for all BGS instances to return before moving to the next step. On the bright side, since all edges of a vertex are guaranteed to be hosted on a single shard, BGE only needs to talk to a selected subset of BGS instances that actually host the edges as opposed to broadcasting the request to all shards. 

Replication

The paper talks about fault-tolerance in three cases: 1) within the data center, 2) across data centers in the same region, and 3) across data centers in different regions. Note that by data center the paper means a cluster. 

Cross Data Center in the same Region

The paper doesn't do a good job of explaining replication inside a data center. I assume the replication inside the data center is done by the key-value store layer. When a BGS instance fails, the BGE instances find out about it through heartbeats and forward the traffic of the failed BGS instance to the other BGS instances. Note that the state is stored in the key-value store, and BS instances are stateless. Thus, it is easy to forward the traffic of the BGS instance to another BGS instance.


ByteGraph uses 2PC for transactions (see next section). As we have talked about before (see this post), there are two points of no return in 2PC that must be persisted in stable storage. One is when a participant decides to say Yes to the coordinator and the other is when the coordinator decides to commit to the transaction. Specifically, the participants should never forget they said Yes to a transaction, and the coordinator also should never forget it decided to abort/commit a transaction. The replication of persistent storage must guarantee those conditions.

Cross Data Center in the same Region

For the clusters in the same region, ByteGraph uses a single-leader replication in the key-value store layer. However, clients can write to any cluster in the region. The write operations are broadcast to other clusters and are immediately visible after applying them to the BGS layer, but only the leader cluster can flush writes to the key-value store. That's why I said it is single-leader in the key-value store layer. A write operation is returned to the client only after it is persisted on the key-value store of the leader cluster. It is possible that a write is persisted on the key-value store layer of the leader cluster but is not replicated to the key-value store of the follower cluster. In that situation, if a client reads from the follower cluster and the read operation gets a cash miss, it will read stale data. Thus, this replication scheme guarantees only eventual consistency.

The paper does not explain what leader-election approach it uses in case of a leader failure.

Cross-region Replication

For cross-region replication, ByteGraph uses a multi-leader approach:
  • Each region has its single-leader replication as explained above. 
  • Leader clusters replicate their writes to each other.
  • ByteGraph uses last-write-wins with Hybrid Logical Clocks (HLCs) to resolve cross-region replications. To learn more about conflict resolution using HLCs you can refer to an earlier post from this blog on Eventual Consistency and Conflict Resolution (part 2)
  • Deletes are replicated using HLC-timestamped tombstones.

Transactions

With ByteGraph, we can use transactions to perform atomic updates, e.g., to add several edges atomically. Regarding the isolation level, it provides read-committed which means read operations are guaranteed to read only committed values; and write operations only overwrite committed values, so no dirty reads or dirty writes. 


ByteGraph transaction protocol is fairly simple. It uses 2PC for atomic commit with the BGE instance that receives the transaction request being the 2PC coordinator and BGS instances involved in the transaction being the participants. (To learn more about 2PC you can refer to my earlier post on Serializability). Since ByteGraph does not aim at serializability level, it does not run a full-blown 2-Phase Locking (2PL) on shards as other databases as Spanner does. Instead, it simply adds provisional values as write-intents to the keys. 

  • write-intents are not visible to read operations. (no dirty read)
  • Unlike FoundationDB which uses multi-versioning, in ByteGraph, each key can have at most one write-intent which effectively locks the key after the first write-intent. (no dirty write)
Running 2PC plus locking keys with write-intents seems to be enough. Why anything more than that might be needed for transactions?
A transaction may violate serializability even when it is guaranteed to read/write only committed values. To understand how, consider this example. Suppose we use transaction T1 to get the list of all users that user A follows (i.e. to get the list of followees). Suppose concurrent to T1, transaction T2 is being executed that removes follows edge from A → B and adds a follows edge from A → C. Now, consider the situation where T1 reads A → B before T2 deletes it, and reads A → C after T2 added it. Thus, T1 finds both A → B and A → C in the list of followees. In this case, T1 has read only committed values (A → B was committed before by some other transaction and A → C is committed by T2), but serializability is violated, because with serializability, we should either order transactions as T1, T2 which results in seeing only A → B, or T2, T1 which results in seeing only A → C in the list of followees. This anomaly is called a read skew. 
Sketch 2.  An example of a read skew with read committed isolation.

Note that although we still wouldn't achieve serializability, the particular scenario explained above could have been avoided we if locked the entire follows edge-tree for user A. As far as I understood, that is not the case with ByteGraph, i.e., the locking is done at the vertex/edge level.

Conclusion

In this post, we examined ByteGraph, a graph database developed by ByteDance to handle a range of workloads, including OLAP, OLSP, and OLTP. The database relies on a key-value store for persistent storage and uses edge-trees, a data structure similar to B-trees, to index the edges of vertices. These edge-trees can be organized into a forest to enable indexing based on multiple properties. ByteGraph dynamically manages index creation based on access patterns. It uses Gremlin as its query language. The query processing is mainly carried out in a centralized manner on BGE instances with predicate pushdown whenever possible to save RPCs. It guarantees eventual consistency and uses HLC timestamps for conflict resolution. ByteGraph also provides atomic read-committed transactions via 2PC.

Reference

[1] Changji Li, Hongzhi Chen, Shuai Zhang, Yingqian Hu, Chao Chen, Zhenjie Zhang, Meng Li et al. "ByteGraph: a high-performance distributed graph database in ByteDance." Proceedings of the VLDB Endowment 15, no. 12 (2022): 3306-3318.

Comments

Popular posts from this blog

In-memory vs. On-disk Databases

Eventual Consistency and Conflict Resolution - Part 1

Amazon DynamoDB: ACID Transactions using Timestamp Ordering