bifrost : distributed system framework for rust

It have been a while since last time I publish an article about my recent work. I am working on a distributed system framework for rust applications. Although there are already are some mature libraries for such purpose, but non of them are for rust. Means there are tons for wheels to be reinvent.

Bifrost is a rust library enabling building robust distributed systems. It does not need any third party software like Zookeeper, etcd or consul to ensure consensus. Bifrost shipped with it's own RPC and raft state machine implementation as basic building block because there is no stable library for such. Bifrost provide a convenient, customizable and fast RPC that does not need to make protocol file and use third party program to generate server traits and client stubs. It also ship with a simple but yet flexible raft state machine framework to build simple data structure like lookup map or more complex system like monitoring client membership changes. It also have potential to build  massive replication data store like tikv in top of it (although bifrost not yet support multi-raft).

bifrost_arch

The idea of using raft is to ensure high availability by replicate logs to majority of machines before it response to the request. Minority crashed or slow server will not harm data integrity. It can also scale read capacity to virtually infinite but write capacity will be limited by individual server in the cluster.

To define a raft state machine, users need to provide action to the functions, to determinate the behavior how to process the function requests. Bifrost currently support 3 kinds of actions: command, query and subscribe. For command functions, clients will send requests to leader in the cluster, leader will append request to the logs and replicate it to followers, commit and return the result. For query requests, client will send requests to any server in the cluster, server will execute the query immediately and return the result with it's last log id; client will check if the log id returned is larger than or equal to the last log id it received; if not, client will reject the result and find another server to try again.  For subscribe, client will start a server to listen to subscription messages; client also need to send command to leader to append subscription logs in configuration sub state machine, which will replicate the subscriptions to it's followers; when events that match the subscription happened, only leader will send message to client subscription servers.

For example, a very simple state machine can be defined like

raft_state_machine! {
    def cmd set(v: $t);
    def qry get() -> $t;
    def sub on_changed() -> ($t, $t);
}

User have to consider what action to use for each function. Because for query (qry) functions generated, the state machine will be immutable. The only way to mutate the state machine is use command (cmd) action.

impl StateMachineCmds for Value {
    fn set(&mut self, v: $t) -> Result<(),()> {
        if let Some(ref callback) = self.callback {
            let old = self.val.clone();
            subs.notify(&commands::on_changed{}, Ok((old, v.clone())));
        }
        self.val = v;
        Ok(())
    }
    fn get(&self) -> Result<$t, ()> {
        Ok(self.val.clone())
    }
}

The raft_state_machine macro will not generate trait functions for subscribe (sub) actions. In the state machine trait implementation, subscriptions should be triggered in command (cmd) functions like the first 4 lines in set command above. You can read the full example here.

To use subscription, it is just as easy as invoke the subscribe function in the client with pattern to match (it can also be empty) and a closure to receive the message. For example to subscribe new entries inserted into a map, the function can be defined as:

def sub on_inserted() -> ($kt, $vt);

To use the subscribe function:

sm_client.on_inserted(move |res| {
    if let Ok((key, value)) = res {
        println!("GOT INSERT CALLBACK {:?} -> {:?}", key, value);
        assert_eq!(inserted_stash.get(&key).unwrap(), &value);
    }
});

Sometime we need to receive specific kinds of message with some limitation. Bifrost introduced a way to subscribe messages with certain parameters. Those functions can be defined as:

def sub on_key_inserted(k: $kt) -> $vt;

We have to notify the trigger to send messages to subscriber

callback.notify(&commands::on_key_inserted{k: k.clone()}, Ok(v.clone()));

To receive the message, in this case, users have to provide the key they want to subscribe. In this case, it will be sk1 clone.

sm_client.on_key_inserted(|res| {
    if let Ok(value) = res {
        println!("GOT K1 CALLBACK {:?}", value);
        assert_eq!(&String::from("v1"), &value);
    }
}, sk1.clone());

RPC, raft state machine framework are all multiplexing. A RPC server and client can support multiply services in one port, the state machine framework can also handle more than one sub state machines.

512px-Multiplexing_diagram.svg

This enable users to reuse resources in a flexible way. Users need to assemble servers and raft state machine with services. For example, in my client membership tests:

let addr = String::from("127.0.0.1:2100");
let raft_service = RaftService::new(Options {
    storage: Storage::Default(),
    address: addr.clone(),
    service_id: 0,
});
let server = Server::new(vec!((0, raft_service.clone())));
let heartbeat_service = Membership::new(&server, &raft_service);
Server::listen_and_resume(server.clone(), &addr);
RaftService::start(&raft_service);
raft_service.bootstrap();

Users need to define services, raft_service and heartbeat_service in the example, initialize the server with one or more services. Users can also use register_service to add others after the initialization, like Membership::new function in the example. If it is required, users can hook up more than one raft services or other services in one server reactor. The only exceptions need to made is to use different service id in registration.

The state machine framework shared the same idea with the RPC. Users need to register sub state machines to RaftService references in order to make it meaningful. In client membership implementation, the Membership::new initialization function will do those jobs for users by

raft_service.register_state_machine(Box::new(Membership {
    heartbeat: heartbeat_service.clone(),
    groups: HashMap::new(),
    members: HashMap::new(),
}));
server.register_service(DEFAULT_SERVICE_ID, heartbeat_service);

Multiplexing upgrades was done in the spring festival recently. It did increase programming complexity, but more resource efficient and desired to do the same job.

Bifrost also contains some utility may come handy. The notable one is the binding feature, borrowed from Clojrue programming language. Users can define a binding variable with a default value. The variable can be reset at any time and it will be accessible anywhere inside the binding block. The binding is thread-local effective which means binding values to the same variable in different threads will not interfere each other.  For example:

def_bindings! {
    bind val IS_LEADER: bool = false;
}

This will define a value binding variable named IS_LEADER and it's default value is false. It can be rebind to other value by a macro block:

with_bindings!(IS_LEADER: is_leader(meta) => {
    meta.state_machine.write().commit_cmd(&entry)
})

Then in anywhere inside the function commit_cmd, IS_LEADER can always be accessed with the value assigned by invoke get function to the binding. Outside the with_bindings macro block, or in other threads that have not yet bind any value, the value to the binding will always be false in this case.

This is useful when deliver values to functions is undesired. In the example above, only small amounts of sub state machines need to know if current state machine is running on a leader raft service, but it is required in some sub state machine like subscription (only leader can notify subscriber). Deliver this parameter to every sub state machine is unnecessary. Bindings can make cleaner code structure and less boilerplate.

There are two kinds of binding bifrost supported. The first kind you have seen is values. It is suitable to be used for primitive types such as u64, i8, bool. Another kind is references, bifrost will wrap default data or binding data with Arc reference counting container. Each times user get from the binding is a reference. It is suitable for objects, such as String or HashMap.

There are still much more to be introduced about bifrost. In next articles, I will discuss about the architecture design in each of the two modules.

Leave a Reply

Your email address will not be published. Required fields are marked *