Consistent hashing in bifrost

After some research to choose sharding method for my key-value store, I still insist using consistent hashing from last version.

According from some references like this video, consistent hashing allows users to lookup where is the data by keys. If clients share the same server information, whenever or wherever the lookup occurs, it will always get a fixed result.

maxresdefault

Consistent hashing can also prevent large scale re-hashing occurred on server node membership changes which may lead to unacceptable amount of data migration across servers. The downside is lookup operations in traditional way only need to do one mod by hash codes, but consistent hash required to do a binary search on the ring.

The core data structure to consistent hashing is a sorted array, which should be considered as ring, like showed in the figure above. The element of the ring, is objects with node hash and node address. To construct the ring, clients should get know server address and their weights. Server nodes objects which is the element of the ring should be generated for each servers and the ratio for nodes  to each server should be determinate by weights, their hash code should be unique. Each node should use the same hash function, because different implementation may have different result ranges. Consistent hashing components should also listen to membership changes in order to regenerate the ring and inform member servers to do data migration.

In bifrost, consistent hashing system is built on it's client membership system. User need to register server to bifrost membership services, plus a weight service to inform clients how much loads it can take. For example, a on-memory key-value store, the weights should be set as how much memory the server has, in MB. Bifrost consistent hashing clients will generate a almost fix sized ring (around 2048 nodes in default) according the weights. Users don't need to normalize weights to prevent a very large or small ring that will impact on it's performances.

The ring will be updated automatically if membership changes occurred. Thanks to subscription feature in bifrost raft state machine, there is no polling but server pushes underneath. User can also watch changes for the whole or individual server to migrate data. In the server changes callback for individual member, user will get new key ranges the member belongs, to remove or load data from outer sources.

To use consistent hashing in bifrost, it is similar to client membership. Server, raft services, heartbeat service for membership should be load and prepared. Consistent hashing data structure are built in term of membership groups, this allows to distribute data to servers for different purpose with different lookup table. So, a member must join groups to be accessible in the consistent hashing lookup. As mentioned before, servers all need to specify it's weight. It need to be initialized in bifrost cluster raft service.

Weights::new(&raft_service);

In member server side, the shell of the service can be created to set weight for itself. Please be noticed in this stage ch is not capable of doing any lookup because the ring have not yet been generated.

let ch = ConsistentHashing::new(&group_1, &wild_raft_client).unwrap();

Weights can be set by

ch.set_weight(&server_1, 1);

After weights been set, ring can be generated by

ch.init_table().unwrap();

In clients that will only to lookup, ch and it's ring can be generated without additional steps by

let ch = ConsistentHashing::new_client(&group_1, &wild_raft_client).unwrap();

Now, ch should be functional and ready to deliver.

To lookup a string key:

let server = ch1.get_server_by_string(&k).unwrap();

The server variable should contain a consistent server name for the key.

If user already knows the hash code, it can be lookup directly by

let server = ch1.get_server(&key_hash).unwrap();

Users may also want to lookup by hashing a object, bifrost allow users to provide a object that implemented serde::Serialize. Bifrost will use bincode for serde to encode the object into binary data and use the same hash function for the consistent hashing to calculate the hash code, by:

let server = ch1.get_server_by(&M {a: 1, b: 2}).unwrap();

To watch key range changes it should responsible in specific server, it is as easy as what we meet in raft state machine subscription.

ch.watch_server_nodes_range_changed(&server_name, move |r| {
    println!("Server key range changed to {:?}", r.unwrap());
});

Users do not need to update the ring manually. When any member in the group leave or go offline, nodes to the server will be removed from the ring. When any member joined or go back online, the nodes to the server will also be added to the ring, automatically. There maybe some lag to be responsive, users need to handle short time inconsistency by themselves.

Consistent hashing in bifrost is managed to be easy to use and also self-contained without other dependices. Combining with it's client membership system, the efforts required to be take care of from users was reduced to minimum.

bifrost : distributed system framework for rust

It have been a while since last time I publish an article about my recent work. I am working on a distributed system framework for rust applications. Although there are already are some mature libraries for such purpose, but non of them are for rust. Means there are tons for wheels to be reinvent.

Bifrost is a rust library enabling building robust distributed systems. It does not need any third party software like Zookeeper, etcd or consul to ensure consensus. Bifrost shipped with it's own RPC and raft state machine implementation as basic building block because there is no stable library for such. Bifrost provide a convenient, customizable and fast RPC that does not need to make protocol file and use third party program to generate server traits and client stubs. It also ship with a simple but yet flexible raft state machine framework to build simple data structure like lookup map or more complex system like monitoring client membership changes. It also have potential to build  massive replication data store like tikv in top of it (although bifrost not yet support multi-raft).

bifrost_arch

The idea of using raft is to ensure high availability by replicate logs to majority of machines before it response to the request. Minority crashed or slow server will not harm data integrity. It can also scale read capacity to virtually infinite but write capacity will be limited by individual server in the cluster.

To define a raft state machine, users need to provide action to the functions, to determinate the behavior how to process the function requests. Bifrost currently support 3 kinds of actions: command, query and subscribe. For command functions, clients will send requests to leader in the cluster, leader will append request to the logs and replicate it to followers, commit and return the result. For query requests, client will send requests to any server in the cluster, server will execute the query immediately and return the result with it's last log id; client will check if the log id returned is larger than or equal to the last log id it received; if not, client will reject the result and find another server to try again.  For subscribe, client will start a server to listen to subscription messages; client also need to send command to leader to append subscription logs in configuration sub state machine, which will replicate the subscriptions to it's followers; when events that match the subscription happened, only leader will send message to client subscription servers.

For example, a very simple state machine can be defined like

raft_state_machine! {
    def cmd set(v: $t);
    def qry get() -> $t;
    def sub on_changed() -> ($t, $t);
}

User have to consider what action to use for each function. Because for query (qry) functions generated, the state machine will be immutable. The only way to mutate the state machine is use command (cmd) action.

impl StateMachineCmds for Value {
    fn set(&mut self, v: $t) -> Result<(),()> {
        if let Some(ref callback) = self.callback {
            let old = self.val.clone();
            subs.notify(&commands::on_changed{}, Ok((old, v.clone())));
        }
        self.val = v;
        Ok(())
    }
    fn get(&self) -> Result<$t, ()> {
        Ok(self.val.clone())
    }
}

The raft_state_machine macro will not generate trait functions for subscribe (sub) actions. In the state machine trait implementation, subscriptions should be triggered in command (cmd) functions like the first 4 lines in set command above. You can read the full example here.

To use subscription, it is just as easy as invoke the subscribe function in the client with pattern to match (it can also be empty) and a closure to receive the message. For example to subscribe new entries inserted into a map, the function can be defined as:

def sub on_inserted() -> ($kt, $vt);

To use the subscribe function:

sm_client.on_inserted(move |res| {
    if let Ok((key, value)) = res {
        println!("GOT INSERT CALLBACK {:?} -> {:?}", key, value);
        assert_eq!(inserted_stash.get(&key).unwrap(), &value);
    }
});

Sometime we need to receive specific kinds of message with some limitation. Bifrost introduced a way to subscribe messages with certain parameters. Those functions can be defined as:

def sub on_key_inserted(k: $kt) -> $vt;

We have to notify the trigger to send messages to subscriber

callback.notify(&commands::on_key_inserted{k: k.clone()}, Ok(v.clone()));

To receive the message, in this case, users have to provide the key they want to subscribe. In this case, it will be sk1 clone.

sm_client.on_key_inserted(|res| {
    if let Ok(value) = res {
        println!("GOT K1 CALLBACK {:?}", value);
        assert_eq!(&String::from("v1"), &value);
    }
}, sk1.clone());

RPC, raft state machine framework are all multiplexing. A RPC server and client can support multiply services in one port, the state machine framework can also handle more than one sub state machines.

512px-Multiplexing_diagram.svg

This enable users to reuse resources in a flexible way. Users need to assemble servers and raft state machine with services. For example, in my client membership tests:

let addr = String::from("127.0.0.1:2100");
let raft_service = RaftService::new(Options {
    storage: Storage::Default(),
    address: addr.clone(),
    service_id: 0,
});
let server = Server::new(vec!((0, raft_service.clone())));
let heartbeat_service = Membership::new(&server, &raft_service);
Server::listen_and_resume(server.clone(), &addr);
RaftService::start(&raft_service);
raft_service.bootstrap();

Users need to define services, raft_service and heartbeat_service in the example, initialize the server with one or more services. Users can also use register_service to add others after the initialization, like Membership::new function in the example. If it is required, users can hook up more than one raft services or other services in one server reactor. The only exceptions need to made is to use different service id in registration.

The state machine framework shared the same idea with the RPC. Users need to register sub state machines to RaftService references in order to make it meaningful. In client membership implementation, the Membership::new initialization function will do those jobs for users by

raft_service.register_state_machine(Box::new(Membership {
    heartbeat: heartbeat_service.clone(),
    groups: HashMap::new(),
    members: HashMap::new(),
}));
server.register_service(DEFAULT_SERVICE_ID, heartbeat_service);

Multiplexing upgrades was done in the spring festival recently. It did increase programming complexity, but more resource efficient and desired to do the same job.

Bifrost also contains some utility may come handy. The notable one is the binding feature, borrowed from Clojrue programming language. Users can define a binding variable with a default value. The variable can be reset at any time and it will be accessible anywhere inside the binding block. The binding is thread-local effective which means binding values to the same variable in different threads will not interfere each other.  For example:

def_bindings! {
    bind val IS_LEADER: bool = false;
}

This will define a value binding variable named IS_LEADER and it's default value is false. It can be rebind to other value by a macro block:

with_bindings!(IS_LEADER: is_leader(meta) => {
    meta.state_machine.write().commit_cmd(&entry)
})

Then in anywhere inside the function commit_cmd, IS_LEADER can always be accessed with the value assigned by invoke get function to the binding. Outside the with_bindings macro block, or in other threads that have not yet bind any value, the value to the binding will always be false in this case.

This is useful when deliver values to functions is undesired. In the example above, only small amounts of sub state machines need to know if current state machine is running on a leader raft service, but it is required in some sub state machine like subscription (only leader can notify subscriber). Deliver this parameter to every sub state machine is unnecessary. Bindings can make cleaner code structure and less boilerplate.

There are two kinds of binding bifrost supported. The first kind you have seen is values. It is suitable to be used for primitive types such as u64, i8, bool. Another kind is references, bifrost will wrap default data or binding data with Arc reference counting container. Each times user get from the binding is a reference. It is suitable for objects, such as String or HashMap.

There are still much more to be introduced about bifrost. In next articles, I will discuss about the architecture design in each of the two modules.

Send, Sync , RefCell, Mutex and RwLock. How rust ensures thread safety.

I am working on bifrost, in the whole month. Expected it to be the foundation of my future projects. Although Raft algorithm itself it simple for everyone to understand, to achieve a usable library like Zookeeper and etcd still required efforts.

In the process of implementing this algorithm with read ability from state machines, which can be executed on both leader and updated follower concurrently to improve read performance, I figured out how rust ensures thread safety by myself.

Let's rehearse how to mutate a variable and what happened under the hood.

First, every mutable variable should have a prefix 'mut' to tell the compiler it is mutable. Then the compiler will keep eyes on those kind of variable to avoid potential inconsistent behavior likely to happen.

For example, in Java you can do this without get an error from compiler. But it is a very bad practice, don't do this at work.

List numbers = new ArrayList();
numbers.add(1);
for int i : numbers {
  numbers.add(i + 1);
}

You may expect you can get 1 and 2 in the number list. But in rust, this code even cannot been compiled.

let mut numbers = vec!(1);
for i in &numbers {
    numbers.push(i + 1);
}

You will get error like this:

error[E0502]: cannot borrow `numbers` as mutable because it is also borrowed as immutable
--> src/raft/mod.rs:444:13
    |
443 |         for i in &numbers {
    |                   ------- immutable borrow occurs here
444 |             numbers.push(i + 1);
    |             ^^^^^^^ mutable borrow occurs here
445 |         }
    |         - immutable borrow ends here

As you can see, you borrowed number as immutable in the beginning of the loop. In the loop block, you will no longer able to mutate 'number'. You may be get annoyed when switching from other PL based on GC or ARC to manage memory like Java/Python/Ruby or even Swift, seeing this kind of 'Error' which used to be at most 'Warning', fight with the compiler just to get the your code to been compiled.

The very essence of Rust is it's zero-cost abstraction. Just like it is advertised, rust cost no more than malloc and free call to manage memory in run time unlike mark and sweep from GC or reference counting from ARC. Which means Rust still use raw pointer under the hood and raw pointer is unsafe.

How can rust ensures memory safety only in compile time with raw pointer? It checks how data flows and when a piece of memory is no longer required, it will be freed instantly.

mut-borrow

The figure above shows how a variable was mutated. After the mutate occurred (green box turned into purple), the memory for green box will be freed.

Why will the green box been freed and allocate new spaces for purple rather than reuse it? Thinking about C++, when there is no resizeable array list data structure exists, you can only allocate an arbitrary amount of memory and cannot expand it afterwards. The only way to expand the array literally, is to allocate another memory with larger space, copy the contents from the old one and free the memory for the old. This is how array list works in standard library. That means the memory address to the array may be changed in the mutation. In the example above, the address to number is unstable in the loop due to mutable function push, which have risk to produce dangling pointer.

Although you can presume the memory preserved for the Vector is always sufficient by invoking with_capacity, but the compiler will never accept any assumption like that because it is dumb and does not trust it's users.

In the concurrency programming scope, things gets more complicated. I used to chat with one of my friend after we watched another round of the greatest movie in 2016 <Your Name.>.

Your-Name
Good story, great graphics.

He mentioned some unexpected behavior encountered when dealing with threads, shared pointers, data races etc, on C++ programming, which cost him a lot of time to figure out what really happened at that time. I have my own experience when do such things on Java, but it may go worse on bare-metal. A pointer shared between threads may get freed when other threads are still using it.

Rust provided some mechanisms to prevent you from data races even you do not realized what you are doing and the consequences. In the doc there is an example for you to make sense on how to work with threads.

use std::sync::{Arc, Mutex};
use std::thread;

let five = Arc::new(Mutex::new(5));

for _ in 0..10 {
    let five = five.clone();

    thread::spawn(move || {
        let mut number = five.lock().unwrap();

        *number += 1;

        println!("{}", *number); // prints 6
    });
}

Although this is not what an efficient concurrent hit counter should really been designed, but it demonstrated how to share data with threads. Let's decomposite to see what's really in there.

First we use Arc and Mutex from the std::sync. You may familiar with the term Mutex, which is the lock, but Arc is the new thing in rust world. Arc stands for atomic reference counting. It looks and behave like the ARC from Apple , which required to be used explicitly in rust, is for memory management. Arc will monitor references been used in threads and free the memory when all threads ended.

ref-count
You can imagine the rectangles are the threads, reference the same data (the green box)

There is also another reference counter named Rc which have the same interfaces with Arc, but cannot been used with multi-thread. You will get another compile error if you replace Arc with Rc instead. Because Rc is not designed for multi-thread, it is not safe to be shared between threads. Arc can be shared because it use an atomic counter under the hood. Rust ensures you will never messed with with Rc and Arc by using two traits, Send and Sync. The two traits usually appears in pair, but they represent different meaning.  Send means you can send the data to threads safely without trigger a copy, like what we use Arc for. It use clone function to create another reference for counting and add one to the counter, when the clone dropped, the counter will minus one. Sync means every modification to the data will be synchronized between the threads, it is what we use Mutex for. If there is no any mutation on the data, we don't really need Sync. In the example above, if we don't add on the number, but only print out the value for the number, Mutex is not required.

Mutex is a lock. Threads trying to acquire the lock for read or write the data exclusively. This is how Mutex ensures the safety when there is data race, it also responsible for tracking memory address changes when mutating the data. Arc can't do such job alone because it is only responsible for sharing the data by sending address to the threads by clone. Every time lock() function is invoked and lock is acquired, it will return a MutexGuard as the entry point to the data and the monitor to ensure the lock will be released when it is out of scope or dropped. In the example above, if developer need to do any mutation to the number in the reference, Mutex is required.

Another alternative lock is read write lock, RwLock in rust standard library. In this lock, read will not block each other but write lock will block both read and write. In bifrost, when implementing read for state machines, read lock is the best practice. Because when there is no mutation in the process, blocking read is undesired. Although RwLock is similar to Mutex by providing extra feature to avoid unnecessary blocking, RwLock and Mutex are not interchangeable. RwLock cannot been used alone because read() provided a window for unsafety, thus it does not implemented Sync trait unless every member in the data implemented Sync as well. In my practice, when I put a threadpool in the RwLock, it cannot send to threads even if the RwLock is wrapped by Arc. The compiler will throw an error complain about Sync trait missing. If I replace RwLock with Mutex, it will be fine. My final solution is to wrap the threadpool by Mutex, which passed the compiler. You see, using the threadpool is a potential mutation to the data structures in the object. Even read() cannot stop you from doing this because the threadpool functions are not tagged as mutable for compiler (it is common when the library rely on unsafe operations). When the threadpool itself cannot ensure memory safety, it have to rely on others, like Mutex.

In this scenario, RefCell is also not allowed in RwLock. RefCell is like the combination of Rc and RwLock, without the actual lock part. It was used as a work around for interior mutability, with some run time cost. Because it cannot ensure safety by itself, I use another RwLock as replacement. I spent hours on figuring out the difference between Mutex and RwLock, finding countermeasures. Thread safety is ensured even if I did not know the whole story in the beginning, avoid potential crash defects.

Although rust is still in its early stage to be used as a productive system programming language because of lacking mature libraries, it demonstrated that it is worthwhile to setup a steep learning cure to ensure even the newbie to this technique are still able to write safe code without blow up anything in run time. When it also have potential to build high performance software, makes it the perfect system programming language to me.