Consistent hashing in bifrost

After some research to choose sharding method for my key-value store, I still insist using consistent hashing from last version.

According from some references like this video, consistent hashing allows users to lookup where is the data by keys. If clients share the same server information, whenever or wherever the lookup occurs, it will always get a fixed result.

maxresdefault

Consistent hashing can also prevent large scale re-hashing occurred on server node membership changes which may lead to unacceptable amount of data migration across servers. The downside is lookup operations in traditional way only need to do one mod by hash codes, but consistent hash required to do a binary search on the ring.

The core data structure to consistent hashing is a sorted array, which should be considered as ring, like showed in the figure above. The element of the ring, is objects with node hash and node address. To construct the ring, clients should get know server address and their weights. Server nodes objects which is the element of the ring should be generated for each servers and the ratio for nodes  to each server should be determinate by weights, their hash code should be unique. Each node should use the same hash function, because different implementation may have different result ranges. Consistent hashing components should also listen to membership changes in order to regenerate the ring and inform member servers to do data migration.

In bifrost, consistent hashing system is built on it's client membership system. User need to register server to bifrost membership services, plus a weight service to inform clients how much loads it can take. For example, a on-memory key-value store, the weights should be set as how much memory the server has, in MB. Bifrost consistent hashing clients will generate a almost fix sized ring (around 2048 nodes in default) according the weights. Users don't need to normalize weights to prevent a very large or small ring that will impact on it's performances.

The ring will be updated automatically if membership changes occurred. Thanks to subscription feature in bifrost raft state machine, there is no polling but server pushes underneath. User can also watch changes for the whole or individual server to migrate data. In the server changes callback for individual member, user will get new key ranges the member belongs, to remove or load data from outer sources.

To use consistent hashing in bifrost, it is similar to client membership. Server, raft services, heartbeat service for membership should be load and prepared. Consistent hashing data structure are built in term of membership groups, this allows to distribute data to servers for different purpose with different lookup table. So, a member must join groups to be accessible in the consistent hashing lookup. As mentioned before, servers all need to specify it's weight. It need to be initialized in bifrost cluster raft service.

Weights::new(&raft_service);

In member server side, the shell of the service can be created to set weight for itself. Please be noticed in this stage ch is not capable of doing any lookup because the ring have not yet been generated.

let ch = ConsistentHashing::new(&group_1, &wild_raft_client).unwrap();

Weights can be set by

ch.set_weight(&server_1, 1);

After weights been set, ring can be generated by

ch.init_table().unwrap();

In clients that will only to lookup, ch and it's ring can be generated without additional steps by

let ch = ConsistentHashing::new_client(&group_1, &wild_raft_client).unwrap();

Now, ch should be functional and ready to deliver.

To lookup a string key:

let server = ch1.get_server_by_string(&k).unwrap();

The server variable should contain a consistent server name for the key.

If user already knows the hash code, it can be lookup directly by

let server = ch1.get_server(&key_hash).unwrap();

Users may also want to lookup by hashing a object, bifrost allow users to provide a object that implemented serde::Serialize. Bifrost will use bincode for serde to encode the object into binary data and use the same hash function for the consistent hashing to calculate the hash code, by:

let server = ch1.get_server_by(&M {a: 1, b: 2}).unwrap();

To watch key range changes it should responsible in specific server, it is as easy as what we meet in raft state machine subscription.

ch.watch_server_nodes_range_changed(&server_name, move |r| {
    println!("Server key range changed to {:?}", r.unwrap());
});

Users do not need to update the ring manually. When any member in the group leave or go offline, nodes to the server will be removed from the ring. When any member joined or go back online, the nodes to the server will also be added to the ring, automatically. There maybe some lag to be responsive, users need to handle short time inconsistency by themselves.

Consistent hashing in bifrost is managed to be easy to use and also self-contained without other dependices. Combining with it's client membership system, the efforts required to be take care of from users was reduced to minimum.

Some thought about addressing problem in Nebuchadnezzar

I just paused on coding to read some paper[1] [2] [3] [4], finding the optimal way to solve cell addressing problem for Nebuchadnezzar. I meet some guys from PingCAP before the spring festival, they asked me why I choose consistent hashing, a P2P way to addressing data in Nebuchadnezzar, over using some addressing service that clients can lookup what is the server for the data and then go to the server to do the jobs (they call it placement driver). PingCAP is building a relational database named TiDB with underlying key-value store TiKV based on theory from Google F1 and Spanner, that meeting was a interview for a remote engineer position, held in a restaurant.

Honestly I can't answer the question at that time. Because I don't know how their placement driver works. Google chose this architecture for almost all of their infrastructure software. Seems like the placement driver won't becomes bottle neck to the whole system. PingCAP claimed their PD can take more than a million per second which is enough for almost any use case. In the other hand, Cassandra (it only use consensus for transactions), Dynamo from Amazon, Trinity from Microsoft and RAMCloud from Stanford all use or propose P2P architecture on top of consistent hashing.

The benefits from PD and range sharding is data can move around in the cluster at will. That means data can store in order by it's fields and fetch them in sequence easily. If one server cannot hold the sequence, PD can dispatch data to other servers. To locate the data, clients have to query the location in PD. In PingCAP PD, it takes at least 1 (when the location for data entry was cached) to 3 (if cached location is stale) network round trip to fetch the data from the actual server.

Consistent hashing don't need a central service to hold the location to the data. The only information it required is members and their weights to generate the lookup table by hashing, sort, and generate range slots. To lookup in the slots, clients need to hash the key and use binary search to find the range slot the key belongs, each slot represents a server address. Servers will maintain it's own hash table to the actual data. There will be only one network round trip and the addressing can be finished in clients. Range slots won't expand with amount of entry the system holds, the amount of range slots is also configurable. The downside to this approach is that range query is somehow difficult to implement and also inefficient compared to PD approach.

It seems like PingCAP considered P2P way but rejected due to it's complexity and the extra network round trips from PD is insignificant to the performance for their product. It is understandable because TiKV is a key-value store backed by RocksDB which it's data persists to stable hard drives. Each operation on TiKV may also trigger a disk operation which is expected. Disk operations may not faster than network (disk latency is about 1 to 15 milliseconds depends on the actual device, network latency should be about 1 milliseconds or less), increased network round trips will not make much impact on such system. In TiKV, due to it's replication, data also need to replicate to at lease one follower, that will also take at least one more network round trip. It is designed to ensure data safety.

Nebuchadnezzar is a key-value store to power my graph engine Morpheus. Morpheus enable users to do basic CRUD and graph traversal by performing parallel breath-first-search. Each vertices or edges are stored in form of cells in the key-value store. When performing graph traversal operations, Morpheus will do get operations for every cells for vertices, edges and meta data it required. Because graph data are all about relations and their access pattern are unpredictable, we can consider it's pattern is mostly random and seldom sequential access. Another factor to be considered is latency. Because every traversal operations are serials of random access to the key-value store, it is essential to keep every operations to be finished as soon as possible. Keep those ideas in mind, Nebuchadnezzar is designated to be a in-memory key-value store without instant writing data to disks and no internal replication supported. It is similar to Redis and memcached, with more features like sharding and schema support for memory efficiency.

Low latency distributed storage system makes addressing problem more sensitive than traditional data store backed by hard drive disks. Because we have made so much efforts on decreasing latency to less than 1 ms in a single server, redundant network round trips will be unacceptable. It should also considered that because there will be huge amount of random access, cache mechanisms mentioned in PD client will fail due to frequent cache miss. It can be foreseen that there will always be at least 2 network round trips (even with batch) for PD approach.

This makes consistent hashing more preferable for Nebuchadnezzar. Addressing run-time takes no more than O(log n), n is the size of range slot size, which would be a constant. There will also be only one and at most one network round trip, overall latency will be more controllable.

Range query problem over consistent hash table still remains. Although Nebuchadnezzar is built for high-performance random access, range query allow us to implement index for client range query which is essential to some use cases. It is not yet solved in the first version and was considered almost impossible. This is because after hashing, data keys lost it's underlying value and the data dispatched to range slots in uniform distribution. There is no way to determine data location from the range slots.

There is some workaround for range query problem in consistent hash table actually. [3] proposed to use prefix tree (PHT) and [4] proposed to use segment tree (DST), for strings and numbers. Both of the workarounds required to use another data structure to keep track of data keys, range query in the data structure and use consistent hashing to fetch the data.

The only problem left now is engineering. We have to choose how to distribute the data structure to cluster servers in large scale. Fourtantly, both PHT and DST are tree data structure, means they all can be split into fine grained sub-trees on different servers and execute commands in parallel if the tree is too large to fit into one server.

But this also leads to new problems, in worst cases we have to reach multiply servers to create and delete index for keys. It will increase latency for every commands related to the index. Although we can put index processes into coroutines that will not affect user thread, but user may also suffer inconsistent from index. In the actual implementation, I will let user to do the trade-off.

I will keep using the original architecture in my first version for addressing problem in next generation of Nebuchadnezzar. It is the result of compromise for performance. Next if I have some time, I may built another system that does not have such low-latency requirement, on PD approach.

bifrost : distributed system framework for rust

It have been a while since last time I publish an article about my recent work. I am working on a distributed system framework for rust applications. Although there are already are some mature libraries for such purpose, but non of them are for rust. Means there are tons for wheels to be reinvent.

Bifrost is a rust library enabling building robust distributed systems. It does not need any third party software like Zookeeper, etcd or consul to ensure consensus. Bifrost shipped with it's own RPC and raft state machine implementation as basic building block because there is no stable library for such. Bifrost provide a convenient, customizable and fast RPC that does not need to make protocol file and use third party program to generate server traits and client stubs. It also ship with a simple but yet flexible raft state machine framework to build simple data structure like lookup map or more complex system like monitoring client membership changes. It also have potential to build  massive replication data store like tikv in top of it (although bifrost not yet support multi-raft).

bifrost_arch

The idea of using raft is to ensure high availability by replicate logs to majority of machines before it response to the request. Minority crashed or slow server will not harm data integrity. It can also scale read capacity to virtually infinite but write capacity will be limited by individual server in the cluster.

To define a raft state machine, users need to provide action to the functions, to determinate the behavior how to process the function requests. Bifrost currently support 3 kinds of actions: command, query and subscribe. For command functions, clients will send requests to leader in the cluster, leader will append request to the logs and replicate it to followers, commit and return the result. For query requests, client will send requests to any server in the cluster, server will execute the query immediately and return the result with it's last log id; client will check if the log id returned is larger than or equal to the last log id it received; if not, client will reject the result and find another server to try again.  For subscribe, client will start a server to listen to subscription messages; client also need to send command to leader to append subscription logs in configuration sub state machine, which will replicate the subscriptions to it's followers; when events that match the subscription happened, only leader will send message to client subscription servers.

For example, a very simple state machine can be defined like

raft_state_machine! {
    def cmd set(v: $t);
    def qry get() -> $t;
    def sub on_changed() -> ($t, $t);
}

User have to consider what action to use for each function. Because for query (qry) functions generated, the state machine will be immutable. The only way to mutate the state machine is use command (cmd) action.

impl StateMachineCmds for Value {
    fn set(&mut self, v: $t) -> Result<(),()> {
        if let Some(ref callback) = self.callback {
            let old = self.val.clone();
            subs.notify(&commands::on_changed{}, Ok((old, v.clone())));
        }
        self.val = v;
        Ok(())
    }
    fn get(&self) -> Result<$t, ()> {
        Ok(self.val.clone())
    }
}

The raft_state_machine macro will not generate trait functions for subscribe (sub) actions. In the state machine trait implementation, subscriptions should be triggered in command (cmd) functions like the first 4 lines in set command above. You can read the full example here.

To use subscription, it is just as easy as invoke the subscribe function in the client with pattern to match (it can also be empty) and a closure to receive the message. For example to subscribe new entries inserted into a map, the function can be defined as:

def sub on_inserted() -> ($kt, $vt);

To use the subscribe function:

sm_client.on_inserted(move |res| {
    if let Ok((key, value)) = res {
        println!("GOT INSERT CALLBACK {:?} -> {:?}", key, value);
        assert_eq!(inserted_stash.get(&key).unwrap(), &value);
    }
});

Sometime we need to receive specific kinds of message with some limitation. Bifrost introduced a way to subscribe messages with certain parameters. Those functions can be defined as:

def sub on_key_inserted(k: $kt) -> $vt;

We have to notify the trigger to send messages to subscriber

callback.notify(&commands::on_key_inserted{k: k.clone()}, Ok(v.clone()));

To receive the message, in this case, users have to provide the key they want to subscribe. In this case, it will be sk1 clone.

sm_client.on_key_inserted(|res| {
    if let Ok(value) = res {
        println!("GOT K1 CALLBACK {:?}", value);
        assert_eq!(&String::from("v1"), &value);
    }
}, sk1.clone());

RPC, raft state machine framework are all multiplexing. A RPC server and client can support multiply services in one port, the state machine framework can also handle more than one sub state machines.

512px-Multiplexing_diagram.svg

This enable users to reuse resources in a flexible way. Users need to assemble servers and raft state machine with services. For example, in my client membership tests:

let addr = String::from("127.0.0.1:2100");
let raft_service = RaftService::new(Options {
    storage: Storage::Default(),
    address: addr.clone(),
    service_id: 0,
});
let server = Server::new(vec!((0, raft_service.clone())));
let heartbeat_service = Membership::new(&server, &raft_service);
Server::listen_and_resume(server.clone(), &addr);
RaftService::start(&raft_service);
raft_service.bootstrap();

Users need to define services, raft_service and heartbeat_service in the example, initialize the server with one or more services. Users can also use register_service to add others after the initialization, like Membership::new function in the example. If it is required, users can hook up more than one raft services or other services in one server reactor. The only exceptions need to made is to use different service id in registration.

The state machine framework shared the same idea with the RPC. Users need to register sub state machines to RaftService references in order to make it meaningful. In client membership implementation, the Membership::new initialization function will do those jobs for users by

raft_service.register_state_machine(Box::new(Membership {
    heartbeat: heartbeat_service.clone(),
    groups: HashMap::new(),
    members: HashMap::new(),
}));
server.register_service(DEFAULT_SERVICE_ID, heartbeat_service);

Multiplexing upgrades was done in the spring festival recently. It did increase programming complexity, but more resource efficient and desired to do the same job.

Bifrost also contains some utility may come handy. The notable one is the binding feature, borrowed from Clojrue programming language. Users can define a binding variable with a default value. The variable can be reset at any time and it will be accessible anywhere inside the binding block. The binding is thread-local effective which means binding values to the same variable in different threads will not interfere each other.  For example:

def_bindings! {
    bind val IS_LEADER: bool = false;
}

This will define a value binding variable named IS_LEADER and it's default value is false. It can be rebind to other value by a macro block:

with_bindings!(IS_LEADER: is_leader(meta) => {
    meta.state_machine.write().commit_cmd(&entry)
})

Then in anywhere inside the function commit_cmd, IS_LEADER can always be accessed with the value assigned by invoke get function to the binding. Outside the with_bindings macro block, or in other threads that have not yet bind any value, the value to the binding will always be false in this case.

This is useful when deliver values to functions is undesired. In the example above, only small amounts of sub state machines need to know if current state machine is running on a leader raft service, but it is required in some sub state machine like subscription (only leader can notify subscriber). Deliver this parameter to every sub state machine is unnecessary. Bindings can make cleaner code structure and less boilerplate.

There are two kinds of binding bifrost supported. The first kind you have seen is values. It is suitable to be used for primitive types such as u64, i8, bool. Another kind is references, bifrost will wrap default data or binding data with Arc reference counting container. Each times user get from the binding is a reference. It is suitable for objects, such as String or HashMap.

There are still much more to be introduced about bifrost. In next articles, I will discuss about the architecture design in each of the two modules.