Scale the shit out of this!

Starting v0.8, we have aimed to focus purely on the stability and performance of Dgraph. Our feature set is at this point good enough for most users – so we’ve decided to freeze it until we reach v1.0.

Part of ensuring stability and performance led us to try and load the entire Stack Overflow on Dgraph; around 2 billion edges. With full-text search, reverses and other indices, this jumps between 6-8 billion edges; which poses unique challenges.

Trying to load up this data has been an interesting journey in problem-solving. Every step of the way we made our best judgment to come up with a good solution – sometimes they stuck, other times they didn’t, only to be replaced by another solution.

To give you an idea of how we store data, we convert all outgoing edges from a node, sharing the same predicate into a single key-value pair. The value is typically a sorted list of integers, what we refer to as posting list. Computer Science graduates familiar with Information Retrieval problem would instantly recognize this as an inverted index.

![Inverted Index](/images/posting list.png)

While typical search engines have the luxury to create these posting lists once using a map-reduce; in Dgraph, we need to update these in real time as mutations flow. But for every new mutation to this posting list, we don’t rewrite this posting list. That’d be computationally expensive. Instead, we store these mutations in a layer above the list; and on some trigger, we merge this layer and regenerate the sorted integer list.

If a posting list has a mutation, we consider it dirty. We have a dirty channel, which we push the corresponding key to, after every successful mutation. A few goroutines would pick keys from this channel and sync them to disk. This sync step includes regenerating the posting list, followed by a write to the underlying key-value store.

Stuck write throughput

Houston, we have a problem

The problem we hit was during loading. When we started loading the data, we saw our write throughput was getting stuck on around 50-60K updates per second. The system was definitely capable of doing more than that.

After debugging, we realized that we were waiting too long for posting lists.

See, when you do a sync to disk, you need to block access to this posting list: you regenerate the value (which could be expensive), then write it to disk, and then delete the value from memory. Then, when someone accesses this PL again, they fetch the value from the key-value store.

The pre-mature optimization behind delete was to save memory, but clearly, it could be avoided. We still wrote to disk, but stopped deleting the value from memory; so the next guy has it ready for more operations.

Things got better. But, we were still waiting too long on popular posting lists. After some head scratching, the culprit turned out to be the dirty channel. For every mutation, we were pushing this key to the dirty channel. And then these goroutines would trigger a sync to disk. For popular lists, we were doing this too frequently. Every mutation would in effect trigger a sync to disk, via the dirty channel. This was not good.

We needed a way to delay the push to dirty channel. So, we introduced a goroutine and a local channel. This goroutine would listen on this channel, and we’ll push to this channel on every successful mutation. When the goroutine receives something on the channel, it will sleep for five seconds. On awake, it would check if there was something new on the channel. If so, it would repeat the loop of sleeping. If not, it would push to the global dirty channel.

This was a nifty idea but crumbled as soon as we ran it. The Martian Movie - Explosion - Twentieth Century Fox/YouTube

The memory usage spiked, and Dgraph quickly went OOM. For a while, we couldn’t figure out why. The heap seemed normal. We debugged the cache; it seemed to be of normal size. The mutations weren’t consuming too much either. Nothing in the system looked like it was out of place.

The Stack

Then almost accidentally, we saw the stack size. We always tracked Go heap size but had never tracked the stack. We just happened to print the entire Go memory stack and noticed that the stack size was totally out of whack.

We quickly noticed the number of goroutines. We had millions of them running, one per key. Each goroutine consumes at least 4KB in the stack (in practice, more). 10 million of them would consume at least 40GB!

Learning a new lesson about the cost of cheap goroutines, we changed our solution. Note that when we pick something from the dirty channel, we don’t immediately work on it. We store it in a local dirty map. We then pick X% of total entries in the dirty map and trigger sync to disk on them.

Previously, we were just storing struct{} in the map value. We changed that to store a timestamp. We removed the local goroutines per posting list and switched back to pushing to dirty chan on every mutation. But, when it reached the local dirty map, we’d just update the timestamp.

When picking the X% entries from the dirty map, we’d check if the timestamp was within five seconds. If not, we’ll just skip that list. This gave us the delay factor. We just had to add a small break condition to avoid looping over the entire dirty map.

Now, this worked. We saw a jump in our mutations per second, from 50-60K to close to 100K. At least initially.

Range Anxiety

![The Martian Movie - Range Anxiety](/images/range anxiety.jpg)

The write throughput would start above 100K, but after a few minutes of running, the throughput would drop down to 70K, then 50K, then lower. We tried to figure out what was causing this drop. Turns out, some of our frequently written posting lists were taking longer and longer to add mutations.

Remember, how we have a mutable layer above the immutable layer. For a small number of frequently updated posting lists, the time delay would almost never trigger. Within the 5 second period, at least some mutation would touch it, which caused its mutable layer to grow unbounded. And this layer is slower to access compared to the immutable layer. So, new additions were being slow.

Along with the time delay, we added a heuristic that for every 1000 writes; these layers would be merged. This let the loading run for longer at higher throughputs. Another day, another fix!

Now that write throughput was OK, we had a new problem. After an hour or so of running, Dgraph would be killed by OOM.

We tried various things, over many days. We enabled swap space. We used all the sync.Pools in every critical path. We evicted things more aggressively. But, all we were able to do was to push it from one hour to three hours, before it would OOM.

See, every time we read a posting list, we would store it in memory in case something else needed it. That’s our version of cache.

We used a sharded map to store these posting lists in memory. We’d have 64 shards, to allow fast concurrent access by multi core machines. The thing is, this cache would grow as you access more and more lists.

We had a mechanism in place to manage memory. We have a goroutine which periodically checks memory usage by our program. If that memory exceeds a certain specified value by the user, we will evict one shard. That is, we’d go over all the keys in one shard and just delete them. During our various attempts, we also added some heuristic to increase that number to 3 shards, if memory usage grew beyond some higher limit.

In theory, this works OK. You go over memory, you discard a chunk of items, and let Go and hence the OS reclaim the memory. Now, Go is slow in releasing memory to the OS, so we’d trigger debug.FreeOSMemory() method, to avoid the OS killing the process with an OOM.

In practice, it sunk fast! First, Go might still not release memory despite calls to debug.FreeOSMemory(). It’s suggestion, not a command. It might hold tens of GBs of memory, while the OS is close to killing the process. Second, we didn’t account for an interesting behavior.

Out of memory

Every time the goroutine would check our memory usage is greater than the threshold, it would trigger an EvictShard(). What happened was, memory usage would sort of stay above the limit for a while, causing many evict shards. Eventually, almost the entire cache would be evicted; and the memory usage would go below the threshold (but not too much, because Go holds memory).

A memory having gone to normal state, the system would accept requests again and start loading the data. But the requests have been retrying for a while, so they’d reload too many posting lists all at once (plus, almost zero caches). The memory usage would suddenly spike, and before the goroutine could trigger another evict shard, the program would OOM.

The lesson learned here was not only to avoid the program from going above the normal limit (which is what we were doing) but to avoid evicting big chunks of cache all at once. Because when it comes back, it comes back with a vengeance!

So, we switched to a single shard LRU cache, which would release memory consistently over time (well, we should have had that in the first place, but it was slower compared to sharded map; so it was a pending investigative TODO item for over a year).

Also, we couldn’t rely upon the goroutine doing Go memory monitoring to tell us how much to release — because of its unreliability. So, we just set the size of the LRU cache upfront, and then maintain that over time.

That fixed our OOM issue. Almost surprisingly!

Getting back to your perf review

Now that memory issue was solved, it was time to load the entire 2B edges and play with it. Almost!

After some hours of loading data, we realized we had some huge posting lists, containing millions of integers. These were the frequently updating ones, which have a very high fan-out. That is, one node has millions of edges going outwards.

This particularly troubled us in indices. For, e.g., Type index, which points from a certain type to all the instances of that type. If you have 100s of millions of comments, all these comments would be referenced in one posting list. Another good example is full-text search, where certain common terms can cause high fan-out nodes.

As the list size grows, re-encoding the list after every 1000 mutations was getting slower and slower. This would cause all writes waiting on this list to slow down considerably; which in turn slows down the data loading.

This is where we are right now! And we have ideas to solve this problem.

One is to split the posting list into smaller and smaller shards, to create essentially sharded posting lists. Each shard holding a contiguous portion of the sorted list. This way, the mutations could be applied to these lists concurrently, and each of these could be re-encoded easily.

The problem with this approach is that when iterating or intersecting these posting lists, all the shards of the posting list would have to be brought into memory. This is bad for two reasons. One, bringing the whole thing into memory is still a problem. And two, more disk reads are now required to read the posting list.

Without going into details, the other option is based on research, which is to encode the posting list smartly.

We initially worked on the first approach, but after writing it quickly rejected it in favor of the second approach.

Posting list encoding change is currently being implemented and tested. We’ll know soon how effective this is, and what new issues we encounter along the path to scaling Dgraph to billions of edges. Stay tuned!

P.S. To see the commits corresponding to this blog post, see recent activity here.

Update: We’ve gone on to produce a new tool for offline bulk loading, with significantly improved loading speed.