At Dgraph , we aim to build a low latency, distributed graph database. This means our data is distributed among nodes in the cluster. Executing a query means multiple nodes are communicating with each other. To keep our latency of communication low, we use a new form of serialization library called Flatbuffers.
What sets FlatBuffers apart is that it represents hierarchical data in a flat binary buffer in such a way that it can still be accessed directly without parsing/unpacking, while also still supporting data structure evolution (forwards/backwards compatibility). The only memory needed to access your data is that of the buffer.
How is Flatbuffers better than Protocol Buffers? FlatBuffers does not need a parsing/unpacking step to a secondary representation before you can access data, often coupled with per-object memory allocation.
Dgraph responses can contain millions of entities and binary blob values. And the fact that Flatbuffers doesn’t need to recreate the entire information in language specific data structures is very helpful for both memory and speed.
Also, TCP is always going to be faster than HTTP, because HTTP is one extra layer on top of TCP. So, our goal was to implement communication using RPC over custom encoding utilizing Flatbuffers.
Our first approach was to use Go language library net/rpc
and implement custom encoding in it.
Helper function to deal with writing and parsing header for the payload:
/*
* Copyright 2016 DGraph Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package conn
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"github.com/dgraph-io/dgraph/x"
)
type Query struct {
Data []byte
}
type Reply struct {
Data []byte
}
func writeHeader(rwc io.ReadWriteCloser, seq uint64,
method string, data []byte) error {
var bh bytes.Buffer
var rerr error
// In package x: func SetError(prev *error, n error)
x.SetError(&rerr, binary.Write(&bh, binary.LittleEndian, seq))
x.SetError(&rerr, binary.Write(&bh, binary.LittleEndian, int32(len(method))))
x.SetError(&rerr, binary.Write(&bh, binary.LittleEndian, int32(len(data))))
_, err := bh.Write([]byte(method))
x.SetError(&rerr, err)
if rerr != nil {
return rerr
}
_, err = rwc.Write(bh.Bytes())
return err
}
func parseHeader(rwc io.ReadWriteCloser, seq *uint64,
method *string, plen *int32) error {
var err error
var sz int32
x.SetError(&err, binary.Read(rwc, binary.LittleEndian, seq))
x.SetError(&err, binary.Read(rwc, binary.LittleEndian, &sz))
x.SetError(&err, binary.Read(rwc, binary.LittleEndian, plen))
if err != nil {
return err
}
buf := make([]byte, sz)
n, err := rwc.Read(buf)
if err != nil {
return err
}
if n != int(sz) {
return fmt.Errorf("Expected: %v. Got: %v\n", sz, n)
}
*method = string(buf)
return nil
}
Code at server to read requests and write responses:
/*
* Copyright 2016 DGraph Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package conn
import (
"io"
"log"
"net/rpc"
)
type ServerCodec struct {
Rwc io.ReadWriteCloser
payloadLen int32
}
func (c *ServerCodec) ReadRequestHeader(r *rpc.Request) error {
return parseHeader(c.Rwc, &r.Seq, &r.ServiceMethod, &c.payloadLen)
}
func (c *ServerCodec) ReadRequestBody(data interface{}) error {
b := make([]byte, c.payloadLen)
_, err := io.ReadFull(c.Rwc, b)
if err != nil {
return err
}
if data == nil {
// If data is nil, discard this request.
return nil
}
query := data.(*Query)
query.Data = b
return nil
}
func (c *ServerCodec) WriteResponse(resp *rpc.Response,
data interface{}) error {
if len(resp.Error) > 0 {
log.Fatal("Response has error: " + resp.Error)
}
if data == nil {
log.Fatal("Worker write response data is nil")
}
reply, ok := data.(*Reply)
if !ok {
log.Fatal("Unable to convert to reply")
}
if err := writeHeader(c.Rwc, resp.Seq,
resp.ServiceMethod, reply.Data); err != nil {
return err
}
_, err := c.Rwc.Write(reply.Data)
return err
}
func (c *ServerCodec) Close() error {
return c.Rwc.Close()
}
Similarly, the code at the client to read requests and write responses:
/*
* Copyright 2016 DGraph Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package conn
import (
"errors"
"fmt"
"io"
"log"
"net/rpc"
)
type ClientCodec struct {
Rwc io.ReadWriteCloser
payloadLen int32
}
func (c *ClientCodec) WriteRequest(r *rpc.Request, body interface{}) error {
if body == nil {
return fmt.Errorf("Nil request body from client.")
}
query := body.(*Query)
if err := writeHeader(c.Rwc, r.Seq, r.ServiceMethod, query.Data); err != nil {
return err
}
n, err := c.Rwc.Write(query.Data)
if n != len(query.Data) {
return errors.New("Unable to write payload.")
}
return err
}
func (c *ClientCodec) ReadResponseHeader(r *rpc.Response) error {
if len(r.Error) > 0 {
log.Fatal("client got response error: " + r.Error)
}
if err := parseHeader(c.Rwc, &r.Seq,
&r.ServiceMethod, &c.payloadLen); err != nil {
return err
}
return nil
}
func (c *ClientCodec) ReadResponseBody(body interface{}) error {
buf := make([]byte, c.payloadLen)
_, err := io.ReadFull(c.Rwc, buf)
reply := body.(*Reply)
reply.Data = buf
return err
}
func (c *ClientCodec) Close() error {
return c.Rwc.Close()
}
Also, each server should be able to send multiple requests in parallel. So, we built a connection pool to create, store and reuse multiple connections:
/*
* Copyright 2016 DGraph Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package conn
import (
"net"
"net/rpc"
"strings"
"time"
"github.com/dgraph-io/dgraph/x"
)
var glog = x.Log("conn") // In package x: func Log(p string) *logrus.Entry
type Pool struct {
clients chan *rpc.Client
Addr string
}
func NewPool(addr string, maxCap int) *Pool {
p := new(Pool)
p.Addr = addr
p.clients = make(chan *rpc.Client, maxCap)
client, err := p.dialNew()
if err != nil {
glog.Fatal(err)
return nil
}
p.clients <- client
return p
}
func (p *Pool) dialNew() (*rpc.Client, error) {
d := &net.Dialer{
Timeout: 3 * time.Minute,
}
var nconn net.Conn
var err error
// This loop will retry for 10 minutes before giving up.
for i := 0; i < 60; i++ {
nconn, err = d.Dial("tcp", p.Addr)
if err == nil {
break
}
if !strings.Contains(err.Error(), "refused") {
break
}
glog.WithField("error", err).WithField("addr", p.Addr).
Info("Retrying connection...")
time.Sleep(10 * time.Second)
}
if err != nil {
return nil, err
}
cc := &ClientCodec{
Rwc: nconn,
}
return rpc.NewClientWithCodec(cc), nil
}
func (p *Pool) Call(serviceMethod string, args interface{},
reply interface{}) error {
client, err := p.get()
if err != nil {
return err
}
if err = client.Call(serviceMethod, args, reply); err != nil {
return err
}
select {
case p.clients <- client:
return nil
default:
return client.Close()
}
}
func (p *Pool) get() (*rpc.Client, error) {
select {
case client := <-p.clients:
return client, nil
default:
return p.dialNew()
}
}
func (p *Pool) Close() error {
// We're not doing a clean exit here. A clean exit here would require
// synchronization, which seems unnecessary for now. But, we should
// add one if required later.
return nil
}
This worked well. And both v0.2 and v0.3 of Dgraph were using this code for the nodes to communicate with each other.
At Dgraph , we spend Fridays learning and improving. This means reading books, papers, articles, watching talks. And we came across a great talk by Jeff Dean of Google: Rapid Response Times
As I mentioned above, we care a lot about query latency. After watching it a couple of times from two different conferences, the prime learning I gathered from his talk was:
Jeff Dean has an impressive track record at Google. He’s behind almost every distributed system in production at Google. So, when he gives a suggestion, you take it seriously.
At v0.4, we’re not doing replication yet. So, we can’t send queries to multiple servers in parallel. However, that’s how the system is going to look like a few minor releases down the lane.
So, we started thinking about how we could change our custom encoding based RPC implementation to achieve something like this. Around the same time, we were looking for a way to figure out slow rpcs on servers. Dave Cheney’s response pointed us to grpc.io.
While I had considered Google built grpc
in the past, I’d rejected it understanding that it requires you to use Protocol Buffers; but we’d already chosen to go with Flatbuffers.
But when Sameer Ajmani’s talk pointed that grpc
is essentially a rewrite of Google internal Stubby from ground up, that got me to dig deeper.
grpc
came with net/context
which could easily do what Jeff Dean had talked about.
Also, it can help see live rpcs and track the slowest ones.
Overall, there was a lot of advantages to switching to grpc
.
But, we didn’t want to give up the performance benefits of Flatbuffers.
So, digging deeper, we found that grpc
did support custom encoding. And we implemented it.
This is the whole equivalent code implemented in grpc
:
/*
* Copyright 2016 DGraph Labs, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package worker
import (
"log"
"google.golang.org/grpc"
)
type PayloadCodec struct{}
func (cb *PayloadCodec) Marshal(v interface{}) ([]byte, error) {
p, ok := v.(*Payload)
if !ok {
log.Fatalf("Invalid type of struct: %+v", v)
}
return p.Data, nil
}
func (cb *PayloadCodec) Unmarshal(data []byte, v interface{}) error {
p, ok := v.(*Payload)
if !ok {
log.Fatalf("Invalid type of struct: %+v", v)
}
p.Data = data
return nil
}
func (cb *PayloadCodec) String() string {
return "worker.PayloadCodec"
}
type Pool struct {
conns chan *grpc.ClientConn
Addr string
}
func NewPool(addr string, maxCap int) *Pool {
p := new(Pool)
p.Addr = addr
p.conns = make(chan *grpc.ClientConn, maxCap)
conn, err := p.dialNew()
if err != nil {
glog.Fatal(err)
return nil
}
p.conns <- conn
return p
}
func (p *Pool) dialNew() (*grpc.ClientConn, error) {
return grpc.Dial(p.Addr, grpc.WithInsecure(), grpc.WithInsecure(),
grpc.WithCodec(&PayloadCodec{}))
}
func (p *Pool) Get() (*grpc.ClientConn, error) {
select {
case conn := <-p.conns:
return conn, nil
default:
return p.dialNew()
}
}
func (p *Pool) Put(conn *grpc.ClientConn) error {
select {
case p.conns <- conn:
return nil
default:
return conn.Close()
}
}
And this is the proto file with Payload and service
:
syntax = "proto3";
package worker;
message Payload {
bytes Data = 1;
}
service Worker {
rpc Hello (Payload) returns (Payload) {}
rpc GetOrAssign (Payload) returns (Payload) {}
rpc Mutate (Payload) returns (Payload) {}
rpc ServeTask (Payload) returns (Payload) {}
}
So, turns out, grpc
not only does custom encoding, but it also leads to
For more, read the pull request which made this change across our code base. Hope you find this useful.
Also read: