Custom encoding: Go implementation in net/rpc vs grpc and why we switched

At Dgraph , we aim to build a low latency, distributed graph database. This means our data is distributed among nodes in the cluster. Executing a query means multiple nodes are communicating with each other. To keep our latency of communication low, we use a new form of serialization library called Flatbuffers.

What sets FlatBuffers apart is that it represents hierarchical data in a flat binary buffer in such a way that it can still be accessed directly without parsing/unpacking, while also still supporting data structure evolution (forwards/backwards compatibility). The only memory needed to access your data is that of the buffer.

How is Flatbuffers better than Protocol Buffers? FlatBuffers does not need a parsing/unpacking step to a secondary representation before you can access data, often coupled with per-object memory allocation.

Dgraph responses can contain millions of entities and binary blob values. And the fact that Flatbuffers doesn’t need to recreate the entire information in language specific data structures is very helpful for both memory and speed.

Also, TCP is always going to be faster than HTTP, because HTTP is one extra layer on top of TCP. So, our goal was to implement communication using RPC over custom encoding utilizing Flatbuffers.

Go net/rpc

Our first approach was to use Go language library net/rpc and implement custom encoding in it.

Helper function to deal with writing and parsing header for the payload:

/*
 * Copyright 2016 DGraph Labs, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package conn

import (
    "bytes"
    "encoding/binary"
    "fmt"
    "io"

    "github.com/dgraph-io/dgraph/x"
)

type Query struct {
    Data []byte
}

type Reply struct {
    Data []byte
}

func writeHeader(rwc io.ReadWriteCloser, seq uint64,
    method string, data []byte) error {

    var bh bytes.Buffer
    var rerr error

    // In package x: func SetError(prev *error, n error)
    x.SetError(&rerr, binary.Write(&bh, binary.LittleEndian, seq))
    x.SetError(&rerr, binary.Write(&bh, binary.LittleEndian, int32(len(method))))
    x.SetError(&rerr, binary.Write(&bh, binary.LittleEndian, int32(len(data))))
    _, err := bh.Write([]byte(method))
    x.SetError(&rerr, err)
    if rerr != nil {
        return rerr
    }
    _, err = rwc.Write(bh.Bytes())
    return err
}

func parseHeader(rwc io.ReadWriteCloser, seq *uint64,
    method *string, plen *int32) error {

    var err error
    var sz int32
    x.SetError(&err, binary.Read(rwc, binary.LittleEndian, seq))
    x.SetError(&err, binary.Read(rwc, binary.LittleEndian, &sz))
    x.SetError(&err, binary.Read(rwc, binary.LittleEndian, plen))
    if err != nil {
        return err
    }

    buf := make([]byte, sz)
    n, err := rwc.Read(buf)
    if err != nil {
        return err
    }
    if n != int(sz) {
        return fmt.Errorf("Expected: %v. Got: %v\n", sz, n)
    }
    *method = string(buf)
    return nil
}

Code at server to read requests and write responses:

/*
 * Copyright 2016 DGraph Labs, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package conn

import (
    "io"
    "log"
    "net/rpc"
)

type ServerCodec struct {
    Rwc        io.ReadWriteCloser
    payloadLen int32
}

func (c *ServerCodec) ReadRequestHeader(r *rpc.Request) error {
    return parseHeader(c.Rwc, &r.Seq, &r.ServiceMethod, &c.payloadLen)
}

func (c *ServerCodec) ReadRequestBody(data interface{}) error {
    b := make([]byte, c.payloadLen)
    _, err := io.ReadFull(c.Rwc, b)
    if err != nil {
        return err
    }

    if data == nil {
        // If data is nil, discard this request.
        return nil
    }
    query := data.(*Query)
    query.Data = b
    return nil
}

func (c *ServerCodec) WriteResponse(resp *rpc.Response,
    data interface{}) error {

    if len(resp.Error) > 0 {
        log.Fatal("Response has error: " + resp.Error)
    }
    if data == nil {
        log.Fatal("Worker write response data is nil")
    }
    reply, ok := data.(*Reply)
    if !ok {
        log.Fatal("Unable to convert to reply")
    }

    if err := writeHeader(c.Rwc, resp.Seq,
        resp.ServiceMethod, reply.Data); err != nil {
        return err
    }

    _, err := c.Rwc.Write(reply.Data)
    return err
}

func (c *ServerCodec) Close() error {
    return c.Rwc.Close()
}

Similarly, the code at the client to read requests and write responses:

/*
 * Copyright 2016 DGraph Labs, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package conn

import (
    "errors"
    "fmt"
    "io"
    "log"
    "net/rpc"
)

type ClientCodec struct {
    Rwc        io.ReadWriteCloser
    payloadLen int32
}

func (c *ClientCodec) WriteRequest(r *rpc.Request, body interface{}) error {
    if body == nil {
        return fmt.Errorf("Nil request body from client.")
    }

    query := body.(*Query)
    if err := writeHeader(c.Rwc, r.Seq, r.ServiceMethod, query.Data); err != nil {
        return err
    }
    n, err := c.Rwc.Write(query.Data)
    if n != len(query.Data) {
        return errors.New("Unable to write payload.")
    }
    return err
}

func (c *ClientCodec) ReadResponseHeader(r *rpc.Response) error {
    if len(r.Error) > 0 {
        log.Fatal("client got response error: " + r.Error)
    }
    if err := parseHeader(c.Rwc, &r.Seq,
        &r.ServiceMethod, &c.payloadLen); err != nil {
        return err
    }
    return nil
}

func (c *ClientCodec) ReadResponseBody(body interface{}) error {
    buf := make([]byte, c.payloadLen)
    _, err := io.ReadFull(c.Rwc, buf)
    reply := body.(*Reply)
    reply.Data = buf
    return err
}

func (c *ClientCodec) Close() error {
    return c.Rwc.Close()
}

Also, each server should be able to send multiple requests in parallel. So, we built a connection pool to create, store and reuse multiple connections:

/*
 * Copyright 2016 DGraph Labs, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package conn

import (
    "net"
    "net/rpc"
    "strings"
    "time"

    "github.com/dgraph-io/dgraph/x"
)

var glog = x.Log("conn") // In package x: func Log(p string) *logrus.Entry

type Pool struct {
    clients chan *rpc.Client
    Addr    string
}

func NewPool(addr string, maxCap int) *Pool {
    p := new(Pool)
    p.Addr = addr
    p.clients = make(chan *rpc.Client, maxCap)
    client, err := p.dialNew()
    if err != nil {
        glog.Fatal(err)
        return nil
    }
    p.clients <- client
    return p
}

func (p *Pool) dialNew() (*rpc.Client, error) {
    d := &net.Dialer{
        Timeout: 3 * time.Minute,
    }
    var nconn net.Conn
    var err error
    // This loop will retry for 10 minutes before giving up.
    for i := 0; i < 60; i++ {
        nconn, err = d.Dial("tcp", p.Addr)
        if err == nil {
            break
        }
        if !strings.Contains(err.Error(), "refused") {
            break
        }

        glog.WithField("error", err).WithField("addr", p.Addr).
            Info("Retrying connection...")
        time.Sleep(10 * time.Second)
    }
    if err != nil {
        return nil, err
    }
    cc := &ClientCodec{
        Rwc: nconn,
    }
    return rpc.NewClientWithCodec(cc), nil
}

func (p *Pool) Call(serviceMethod string, args interface{},
    reply interface{}) error {

    client, err := p.get()
    if err != nil {
        return err
    }
    if err = client.Call(serviceMethod, args, reply); err != nil {
        return err
    }

    select {
    case p.clients <- client:
        return nil
    default:
        return client.Close()
    }
}

func (p *Pool) get() (*rpc.Client, error) {
    select {
    case client := <-p.clients:
        return client, nil
    default:
        return p.dialNew()
    }
}

func (p *Pool) Close() error {
    // We're not doing a clean exit here. A clean exit here would require
    // synchronization, which seems unnecessary for now. But, we should
    // add one if required later.
    return nil
}

This worked well. And both v0.2 and v0.3 of Dgraph were using this code for the nodes to communicate with each other.


The Switch

At Dgraph , we spend Fridays learning and improving. This means reading books, papers, articles, watching talks. And we came across a great talk by Jeff Dean of Google: Rapid Response Times

As I mentioned above, we care a lot about query latency. After watching it a couple of times from two different conferences, the prime learning I gathered from his talk was:

  • Send request to the first replica, telling it that it’s going to send it to a second one.
  • 2 ms later, send the request to the second one, telling it that it’s already sent to the first one.
  • When one of them starts processing the request, it sends a cancellation request directly to its peer.
  • If the peer hasn’t started processing the request, it would just cancel the request.
  • In a rare case, both of them process it and overall do twice the work.
  • Overall, your latencies improve considerably due to this method.

Jeff Dean has an impressive track record at Google. He’s behind almost every distributed system in production at Google. So, when he gives a suggestion, you take it seriously.

At v0.4, we’re not doing replication yet. So, we can’t send queries to multiple servers in parallel. However, that’s how the system is going to look like a few minor releases down the lane.

So, we started thinking about how we could change our custom encoding based RPC implementation to achieve something like this. Around the same time, we were looking for a way to figure out slow rpcs on servers. Dave Cheney’s response pointed us to grpc.io.

While I had considered Google built grpc in the past, I’d rejected it understanding that it requires you to use Protocol Buffers; but we’d already chosen to go with Flatbuffers. But when Sameer Ajmani’s talk pointed that grpc is essentially a rewrite of Google internal Stubby from ground up, that got me to dig deeper. grpc came with net/context which could easily do what Jeff Dean had talked about. Also, it can help see live rpcs and track the slowest ones.

Overall, there was a lot of advantages to switching to grpc. But, we didn’t want to give up the performance benefits of Flatbuffers.

So, digging deeper, we found that grpc did support custom encoding. And we implemented it. This is the whole equivalent code implemented in grpc:

/*
 * Copyright 2016 DGraph Labs, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *         http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package worker

import (
    "log"

    "google.golang.org/grpc"
)

type PayloadCodec struct{}

func (cb *PayloadCodec) Marshal(v interface{}) ([]byte, error) {
    p, ok := v.(*Payload)
    if !ok {
        log.Fatalf("Invalid type of struct: %+v", v)
    }
    return p.Data, nil
}

func (cb *PayloadCodec) Unmarshal(data []byte, v interface{}) error {
    p, ok := v.(*Payload)
    if !ok {
        log.Fatalf("Invalid type of struct: %+v", v)
    }
    p.Data = data
    return nil
}

func (cb *PayloadCodec) String() string {
    return "worker.PayloadCodec"
}

type Pool struct {
    conns chan *grpc.ClientConn
    Addr  string
}

func NewPool(addr string, maxCap int) *Pool {
    p := new(Pool)
    p.Addr = addr
    p.conns = make(chan *grpc.ClientConn, maxCap)
    conn, err := p.dialNew()
    if err != nil {
        glog.Fatal(err)
        return nil
    }
    p.conns <- conn
    return p
}

func (p *Pool) dialNew() (*grpc.ClientConn, error) {
    return grpc.Dial(p.Addr, grpc.WithInsecure(), grpc.WithInsecure(),
        grpc.WithCodec(&PayloadCodec{}))
}

func (p *Pool) Get() (*grpc.ClientConn, error) {
    select {
    case conn := <-p.conns:
        return conn, nil
    default:
        return p.dialNew()
    }
}

func (p *Pool) Put(conn *grpc.ClientConn) error {
    select {
    case p.conns <- conn:
        return nil
    default:
        return conn.Close()
    }
}

And this is the proto file with Payload and service:

syntax = "proto3";

package worker;

message Payload {
    bytes Data = 1;
}

service Worker {
    rpc Hello (Payload) returns (Payload) {}
    rpc GetOrAssign (Payload) returns (Payload) {}
    rpc Mutate (Payload) returns (Payload) {}
    rpc ServeTask (Payload) returns (Payload) {}
}

Conclusion

So, turns out, grpc not only does custom encoding, but it also leads to

  • smaller code footprint.
  • net/context, which in turn allows client to cancel pending rpc requests to servers, among many other benefits.
  • net/trace, which allows tracing of rpcs and long-lived objects.

For more, read the pull request which made this change across our code base. Hope you find this useful.

Also read: