Dgraph uses RAFT whenever consensus among a distribued set of servers is required, such as ensuring that a transaction has been properly committed, or determining the proper timestamp for a read or write. Each zero or alpha
group uses raft to elect leaders.
This section aims to explain the RAFT consensus algorithm in simple terms. The idea is to give you just enough to make you understand the basic concepts, without going into explanations about why it works accurately. For a detailed explanation of RAFT, please read the original thesis paper by Diego Ongaro.
Each election cycle is considered a term, during which there is a single leader (just like in a democracy). When a new election starts, the term number is increased. This is straightforward and obvious but is a critical factor for the accuracy of the algorithm.
In rare cases, if no leader could be elected within an
ElectionTimeout, that term can end without
Each server in cluster can be in one of the following three states:
Generally, the servers are in leader or follower state. When the leader crashes or the communication breaks down, the followers will wait for election timeout before converting to candidates. The election timeout is randomized. This would allow one of them to declare candidacy before others. The candidate would vote for itself and wait for the majority of the cluster to vote for it as well. If a follower hears from a candidate with a higher term than the current (dead in this case) leader, it would vote for it. The candidate who gets majority votes wins the election and becomes the leader.
The leader then tells the rest of the cluster about the result (Heartbeat Communication) and the other candidates then become followers. Again, the cluster goes back into leader-follower model.
A leader could revert to being a follower without an election, if it finds another leader in the cluster with a higher Term). This might happen in rare cases (network partitions).
There is unidirectional RPC communication, from the leader to all/any followers. The followers never ping the
leader. The leader sends
AppendEntries messages to the followers with logs containing state
updates. When the leader sends
AppendEntries with zero logs (updates), that’s considered a
Heartbeat. The leader sends all followers Heartbeats at regular intervals.
If a follower doesn’t receive a Heartbeat for
ElectionTimeout duration (generally between
150ms to 300ms), the leader may be down, so it converts it’s state to candidate (as mentioned in Server States).
It then requests for votes by sending a
RequestVote call to other servers. If it gets votes from the majority, the candidate becomes the leader. On becoming leader, it sends Heartbeats
to all other servers to establish its authority.
Every communication request contains a term number. If a server receives a request with a stale term number, it rejects the request.
Dgraph uses LSM Trees, so we call commits or updates “Log Entries.” Log Entries are numbered sequentially and contain a term number. An Entry is considered committed if it has been replicated (and stored) by a majority of the servers.
On being notified of the results of a client request (which is often processed on other servers), the leader does four things to coordinate RAFT consensus (this is also called Log Replication):
- Appends and persists to its log.
AppendEntriesin parallel to other servers.
- Monitors for the majority to report it is replicated, after which it considers the entry committed and applies it to the leader’s state machine.
- Notifies followers that the entry is committed so that they can apply it to their state machines.
A leader never overwrites or deletes its entries. RAFT guarantees that if an entry is committed, all future leaders will have it. A leader can, however, force overwrite the followers' logs, so they match leader’s logs if necessary.
Each server persists its current term and vote, so it doesn’t end up voting twice in the same term.
On receiving a
RequestVote RPC, the server denies its vote if its log is more up-to-date than the
candidate. It would also deny a vote, if a minimum
ElectionTimeout hasn’t passed since the last
Heartbeat from the leader. Otherwise, it gives a vote and resets its
Up-to-date property of logs is determined as follows:
- Term number comparison
- Index number or log length comparison
Raft only allows single-server changes, i.e. only one server can be added or deleted at a time.
This is achieved by cluster configuration changes. Cluster configurations are communicated using
special entries in
The significant difference in how cluster configuration changes are applied compared to how typical Log Entries are applied is that the followers don’t wait for a commitment confirmation from the leader before enabling it.
A server can respond to both
RequestVote, without checking current
configuration. This mechanism allows new servers to participate without officially being part of
the cluster. Without this feature, things won’t work.
When a new server joins, it won’t have any logs, and they need to be streamed. To ensure cluster availability, Raft allows this server to join the cluster as a non-voting member. Once it’s caught up, voting can be enabled. This also allows the cluster to remove this server in case it’s too slow to catch up, before giving voting rights (sort of like getting a green card to allow assimilation before citizenship is awarded providing voting rights).
One of the ways to do this is snapshotting. As soon as the state machine is synced to disk, the logs can be discarded.
Snapshots are taken by default after 10000 Raft entries, with a frequency of 30 minutes. The frequency indicates the time between two subsequent snapshots. These numbers can be adjusted using the
snapshot-after-duration options respectively. Snapshots are created only when conditions set by both of these options have been met.
Clients must locate the cluster to interact with it. Various approaches can be used for discovery.
A client can randomly pick up any server in the cluster. If the server isn’t a leader, the request should be rejected, and the leader information passed along. The client can then re-route it’s query to the leader. Alternatively, the server can proxy the client’s request to the leader.
When a client first starts up, it can register itself with the cluster using
This creates a new client id, which is used for all subsequent RPCs.
Servers must filter out duplicate requests. They can do this via session tracking where they use the client id and another request UID set by the client to avoid reprocessing duplicate requests. RAFT also suggests storing responses along with the request UIDs to reply back in case it receives a duplicate request.
Linearizability requires the results of a read to reflect the latest committed write. Serializability, on the other hand, allows stale reads.
To ensure linearizability of read-only queries run via leader, leader must take these steps:
- Leader must have at least one committed entry in its term. This would allow for up-to-dated-ness. (C’mon! Now that you’re in power do something at least!)
- Leader stores it’s latest commit index.
- Leader sends Heartbeats to the cluster and waits for ACK from majority. Now it knows that it’s the leader. (No successful coup. Yup, still the democratically elected dictator I was before!)
- Leader waits for its state machine to advance to readIndex.
- Leader can now run the queries against state machine and reply to clients.
Read-only queries can also be serviced by followers to reduce the load on the leader. But this could lead to stale results unless the follower confirms that its leader is the real leader(network partition). To do so, it would have to send a query to the leader, and the leader would have to do steps 1-3. Then the follower can do 4-5.
Read-only queries would have to be batched up, and then RPCs would have to go to the leader for each batch, who in turn would have to send further RPCs to the whole cluster. (This is not scalable without considerable optimizations to deal with latency.)
An alternative approach would be to have the servers return the index corresponding to their state machine. The client can then keep track of the maximum index it has received from replies so far. And pass it along to the server for the next request. If a server’s state machine hasn’t reached the index provided by the client, it will not service the request. This approach avoids inter-server communication and is a lot more scalable. (This approach does not guarantee linearizability, but should converge quickly to the latest write.)